diff --git a/core/src/main/java/com/softwaremill/jox/Channel.java b/core/src/main/java/com/softwaremill/jox/Channel.java index 6452f2f..c25ae44 100644 --- a/core/src/main/java/com/softwaremill/jox/Channel.java +++ b/core/src/main/java/com/softwaremill/jox/Channel.java @@ -3,8 +3,6 @@ import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; import java.util.Comparator; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import java.util.function.Function; import java.util.function.Supplier; @@ -88,7 +86,13 @@ operations on these (previous) segments, and we'll end up wanting to remove such operations won't use them, so the relinking won't be useful. */ + // immutable state + private final int capacity; + private final boolean isRendezvous; + private final boolean isUnlimited; + + // mutable state /** * The total number of `send` operations ever invoked, and a flag indicating if the channel is closed. @@ -96,21 +100,42 @@ operations on these (previous) segments, and we'll end up wanting to remove such *

* Each {@link Channel#send} invocation gets a unique cell to process. */ - private final AtomicLong sendersAndClosedFlag = new AtomicLong(0L); - private final AtomicLong receivers = new AtomicLong(0L); - private final AtomicLong bufferEnd; + private volatile long sendersAndClosedFlag = 0L; + private volatile long receivers = 0L; + private volatile long bufferEnd; /** * Segments holding cell states. State can be {@link CellState}, {@link Continuation}, {@link SelectInstance}, or a user-provided buffered value. */ - private final AtomicReference sendSegment; - private final AtomicReference receiveSegment; - private final AtomicReference bufferEndSegment; + private volatile Segment sendSegment; + private volatile Segment receiveSegment; + private volatile Segment bufferEndSegment; + private volatile ChannelClosed closedReason; - private final AtomicReference closedReason; + // var handles - private final boolean isRendezvous; - private final boolean isUnlimited; + private static final VarHandle SENDERS_AND_CLOSE_FLAG; + private static final VarHandle RECEIVERS; + private static final VarHandle BUFFER_END; + private static final VarHandle SEND_SEGMENT; + private static final VarHandle RECEIVE_SEGMENT; + private static final VarHandle BUFFER_END_SEGMENT; + private static final VarHandle CLOSED_REASON; + + static { + try { + MethodHandles.Lookup l = MethodHandles.privateLookupIn(Channel.class, MethodHandles.lookup()); + SENDERS_AND_CLOSE_FLAG = l.findVarHandle(Channel.class, "sendersAndClosedFlag", long.class); + RECEIVERS = l.findVarHandle(Channel.class, "receivers", long.class); + BUFFER_END = l.findVarHandle(Channel.class, "bufferEnd", long.class); + SEND_SEGMENT = l.findVarHandle(Channel.class, "sendSegment", Segment.class); + RECEIVE_SEGMENT = l.findVarHandle(Channel.class, "receiveSegment", Segment.class); + BUFFER_END_SEGMENT = l.findVarHandle(Channel.class, "bufferEndSegment", Segment.class); + CLOSED_REASON = l.findVarHandle(Channel.class, "closedReason", ChannelClosed.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } /** * Creates a rendezvous channel. @@ -134,15 +159,13 @@ public Channel(int capacity) { var firstSegment = new Segment(0, null, isRendezvousOrUnlimited ? 2 : 3, isRendezvousOrUnlimited); - sendSegment = new AtomicReference<>(firstSegment); - receiveSegment = new AtomicReference<>(firstSegment); + sendSegment = firstSegment; + receiveSegment = firstSegment; // If the capacity is 0 or -1, buffer expansion never happens, so the buffer end segment points to a null segment, // not the first one. This is also reflected in the pointer counter of firstSegment. - bufferEndSegment = new AtomicReference<>(isRendezvousOrUnlimited ? Segment.NULL_SEGMENT : firstSegment); - - bufferEnd = new AtomicLong(capacity); + bufferEndSegment = isRendezvousOrUnlimited ? Segment.NULL_SEGMENT : firstSegment; - closedReason = new AtomicReference<>(null); + bufferEnd = capacity; } public static Channel newUnlimitedChannel() { @@ -178,9 +201,9 @@ private Object doSend(T value, SelectInstance select, SelectClause selectClau } while (true) { // reading the segment before the counter increment - this is needed to find the required segment later - var segment = sendSegment.get(); + var segment = sendSegment; // reserving the next cell - var scf = sendersAndClosedFlag.getAndIncrement(); + var scf = (long) SENDERS_AND_CLOSE_FLAG.getAndAdd(this, 1L); var s = getSendersCounter(scf); // calculating the segment id and the index within the segment @@ -189,16 +212,16 @@ private Object doSend(T value, SelectInstance select, SelectClause selectClau // check if `sendSegment` stores a previous segment, if so move the reference forward if (segment.getId() != id) { - segment = findAndMoveForward(sendSegment, segment, id); + segment = findAndMoveForward(SEND_SEGMENT, this, segment, id); if (segment == null) { // the channel has been closed, `s` points to a segment which doesn't exist - return closedReason.get(); + return closedReason; } // if we still have another segment, the segment must have been removed if (segment.getId() != id) { // skipping all interrupted cells, and trying with a new one - sendersAndClosedFlag.compareAndSet(s, segment.getId() * Segment.SEGMENT_SIZE); + SENDERS_AND_CLOSE_FLAG.compareAndSet(this, s, segment.getId() * Segment.SEGMENT_SIZE); continue; } } @@ -207,7 +230,7 @@ private Object doSend(T value, SelectInstance select, SelectClause selectClau // reference forward, so that segments which become eligible for removal can be GCed (after the channel // is closed, e.g. when the channel is done and there are some values left to be received) if (isClosed(scf)) { - return closedReason.get(); + return closedReason; } var sendResult = updateCellSend(segment, i, s, value, select, selectClause); @@ -233,7 +256,7 @@ private Object doSend(T value, SelectInstance select, SelectClause selectClau // trying again with a new cell } else if (sendResult == SendResult.CLOSED) { // not cleaning the previous segments - the close procedure might still need it - return closedReason.get(); + return closedReason; } else { throw new IllegalStateException("Unexpected result: " + sendResult); } @@ -253,7 +276,7 @@ private Object updateCellSend(Segment segment, int i, long s, T value, SelectIns if (state == null) { // reading the buffer end & receiver's counter if needed - if (!isUnlimited && s >= (isRendezvous ? 0 : bufferEnd.get()) && s >= receivers.get()) { + if (!isUnlimited && s >= (isRendezvous ? 0 : bufferEnd) && s >= receivers) { // cell is empty, and no receiver, not in buffer -> suspend if (select != null) { // cell is empty, no receiver, and we are in a select -> store the select instance @@ -350,9 +373,9 @@ public Object receiveSafe() throws InterruptedException { private Object doReceive(SelectInstance select, SelectClause selectClause) throws InterruptedException { while (true) { // reading the segment before the counter increment - this is needed to find the required segment later - var segment = receiveSegment.get(); + var segment = receiveSegment; // reserving the next cell - var r = receivers.getAndIncrement(); + var r = (long) RECEIVERS.getAndAdd(this, 1L); // calculating the segment id and the index within the segment var id = r / Segment.SEGMENT_SIZE; @@ -360,16 +383,16 @@ private Object doReceive(SelectInstance select, SelectClause selectClause) th // check if `receiveSegment` stores a previous segment, if so move the reference forward if (segment.getId() != id) { - segment = findAndMoveForward(receiveSegment, segment, id); + segment = findAndMoveForward(RECEIVE_SEGMENT, this, segment, id); if (segment == null) { // the channel has been closed, r points to a segment which doesn't exist - return closedReason.get(); + return closedReason; } // if we still have another segment, the segment must have been removed if (segment.getId() != id) { // skipping all interrupted cells, and trying with a new one - receivers.compareAndSet(r, segment.getId() * Segment.SEGMENT_SIZE); + RECEIVERS.compareAndSet(this, r, segment.getId() * Segment.SEGMENT_SIZE); continue; } } @@ -377,7 +400,7 @@ private Object doReceive(SelectInstance select, SelectClause selectClause) th var result = updateCellReceive(segment, i, r, select, selectClause); if (result == ReceiveResult.CLOSED) { // not cleaning the previous segments - the close procedure might still need it - return closedReason.get(); + return closedReason; } else { /* After `updateCellReceive` completes and the channel isn't closed, we can be sure that S > r, unless @@ -419,7 +442,7 @@ private Object updateCellReceive(Segment segment, int i, long r, SelectInstance var state = segment.getCell(i); // reading the current state of the cell; we'll try to update it atomically if (state == null || state == IN_BUFFER) { - if (r >= getSendersCounter(sendersAndClosedFlag.get())) { // reading the sender's counter + if (r >= getSendersCounter(sendersAndClosedFlag)) { // reading the sender's counter if (select != null) { // cell is empty, no sender, and we are in a select -> store the select instance // and await externally @@ -513,9 +536,9 @@ private void expandBuffer() { if (isRendezvous || isUnlimited) return; while (true) { // reading the segment before the counter increment - this is needed to find the required segment later - var segment = bufferEndSegment.get(); + var segment = bufferEndSegment; // reserving the next cell - var b = bufferEnd.getAndIncrement(); + var b = (long) BUFFER_END.getAndAdd(this, 1L); // calculating the segment id and the index within the segment var id = b / Segment.SEGMENT_SIZE; @@ -523,7 +546,7 @@ private void expandBuffer() { // check if `bufferEndSegment` stores a previous segment, if so move the reference forward if (segment.getId() != id) { - segment = findAndMoveForward(bufferEndSegment, segment, id); + segment = findAndMoveForward(BUFFER_END_SEGMENT, this, segment, id); if (segment == null) { // the channel has been closed, b points to a segment which doesn't exist, nowhere to expand return; @@ -535,7 +558,7 @@ private void expandBuffer() { // senders must have already been processed by expandBuffer) if (segment.getId() != id) { // skipping all interrupted cells as an optimization - bufferEnd.compareAndSet(b, segment.getId() * Segment.SEGMENT_SIZE); + BUFFER_END.compareAndSet(this, b, segment.getId() * Segment.SEGMENT_SIZE); // another buffer expansion already happened for this cell (in the removed segment) return; } @@ -660,17 +683,24 @@ public Object errorSafe(Throwable reason) { } private Object closeSafe(ChannelClosed channelClosed) { - if (!closedReason.compareAndSet(null, channelClosed)) { - return closedReason.get(); // already closed + if (!CLOSED_REASON.compareAndSet(this, null, channelClosed)) { + return closedReason; // already closed } // after this completes, it's guaranteed than no sender with `s >= lastSender` will proceed with the usual // sending algorithm, as `send()` will observe that the channel is closed - var scf = sendersAndClosedFlag.updateAndGet(this::setClosedFlag); + long scf; + var scfUpdated = false; + do { + var initialScf = sendersAndClosedFlag; + scf = setClosedFlag(initialScf); + scfUpdated = SENDERS_AND_CLOSE_FLAG.compareAndSet(this, initialScf, scf); + } while (!scfUpdated); + var lastSender = getSendersCounter(scf); // closing the segment chain guarantees that no new segment beyond `lastSegment` will be created - var lastSegment = sendSegment.get().close(); + var lastSegment = sendSegment.close(); if (channelClosed instanceof ChannelError) { // closing all cells, as this is an error @@ -741,7 +771,7 @@ private void updateCellClose(Segment segment, int i) { return; } } else if (state instanceof StoredSelectClause ss) { - ss.getSelect().channelClosed(ss, closedReason.get()); + ss.getSelect().channelClosed(ss, closedReason); // not setting the state & updating counters, as each non-selected stored select cell will be // cleaned up, setting an interrupted state (and informing the segment) // until this happens, other (concurrent) invocations of `channelClosed` or `trySelect` will still @@ -774,13 +804,13 @@ private void updateCellClose(Segment segment, int i) { @Override public ChannelClosed closedForSend() { - return isClosed(sendersAndClosedFlag.get()) ? closedReason.get() : null; + return isClosed(sendersAndClosedFlag) ? closedReason : null; } @Override public ChannelClosed closedForReceive() { - if (isClosed(sendersAndClosedFlag.get())) { - var cr = closedReason.get(); // cannot be null + if (isClosed(sendersAndClosedFlag)) { + var cr = closedReason; // cannot be null if (cr instanceof ChannelError) { return cr; } else { @@ -795,10 +825,10 @@ public ChannelClosed closedForReceive() { private boolean hasValuesToReceive() { while (true) { // reading the segment before the counter - this is needed to find the required segment later - var segment = receiveSegment.get(); + var segment = receiveSegment; // r is the cell which will be used by the next receive - var r = receivers.get(); - var s = getSendersCounter(sendersAndClosedFlag.get()); + var r = receivers; + var s = getSendersCounter(sendersAndClosedFlag); if (s <= r) { // for sure, nothing is buffered / no senders are waiting @@ -811,7 +841,7 @@ private boolean hasValuesToReceive() { // check if `receiveSegment` stores a previous segment, if so move the reference forward if (segment.getId() != id) { - segment = findAndMoveForward(receiveSegment, segment, id); + segment = findAndMoveForward(RECEIVE_SEGMENT, this, segment, id); if (segment == null) { // the channel has been closed, r points to a segment which doesn't exist return false; @@ -820,7 +850,7 @@ private boolean hasValuesToReceive() { // if we still have another segment, the segment must have been removed if (segment.getId() != id) { // skipping all interrupted cells, and trying with a new one - receivers.compareAndSet(r, segment.getId() * Segment.SEGMENT_SIZE); + RECEIVERS.compareAndSet(this, r, segment.getId() * Segment.SEGMENT_SIZE); continue; } } @@ -832,7 +862,7 @@ private boolean hasValuesToReceive() { return true; } else { // nothing to receive, we can (try to, if not already done) bump the counter and try again - receivers.compareAndSet(r, r + 1); + RECEIVERS.compareAndSet(this, r, r + 1); } } } @@ -964,35 +994,35 @@ void cleanupStoredSelectClause(Segment segment, int i, boolean isSender) { private static final int SENDERS_AND_CLOSED_FLAG_SHIFT = 60; private static final long SENDERS_COUNTER_MASK = (1L << SENDERS_AND_CLOSED_FLAG_SHIFT) - 1; - private long getSendersCounter(long sendersAndClosedFlag) { + private static long getSendersCounter(long sendersAndClosedFlag) { return sendersAndClosedFlag & SENDERS_COUNTER_MASK; } - private boolean isClosed(long sendersAndClosedFlag) { + private static boolean isClosed(long sendersAndClosedFlag) { return sendersAndClosedFlag >> SENDERS_AND_CLOSED_FLAG_SHIFT == 1; } - private long setClosedFlag(long sendersAndClosedFlag) { + private static long setClosedFlag(long sendersAndClosedFlag) { return sendersAndClosedFlag | (1L << SENDERS_AND_CLOSED_FLAG_SHIFT); } @Override public String toString() { //noinspection OptionalGetWithoutIsPresent - var smallestSegment = Stream.of(sendSegment.get(), receiveSegment.get(), bufferEndSegment.get()) + var smallestSegment = Stream.of(sendSegment, receiveSegment, bufferEndSegment) .filter(s -> s != Segment.NULL_SEGMENT) .min(Comparator.comparingLong(Segment::getId)).get(); - var scf = sendersAndClosedFlag.get(); + var scf = sendersAndClosedFlag; var sendersCounter = getSendersCounter(scf); var isClosed = isClosed(scf); var sb = new StringBuilder(); sb.append("Channel(capacity=").append(capacity) .append(", closed=").append(isClosed) - .append(", sendSegment=").append(sendSegment.get().getId()).append(", sendCounter=").append(sendersCounter) - .append(", receiveSegment=").append(receiveSegment.get().getId()).append(", receiveCounter=").append(receivers.get()) - .append(", bufferEndSegment=").append(bufferEndSegment.get().getId()).append(", bufferEndCounter=").append(bufferEnd.get()) + .append(", sendSegment=").append(sendSegment.getId()).append(", sendCounter=").append(sendersCounter) + .append(", receiveSegment=").append(receiveSegment.getId()).append(", receiveCounter=").append(receivers) + .append(", bufferEndSegment=").append(bufferEndSegment.getId()).append(", bufferEndCounter=").append(bufferEnd) .append("): \n"); var s = smallestSegment; while (s != null) { diff --git a/core/src/main/java/com/softwaremill/jox/Segment.java b/core/src/main/java/com/softwaremill/jox/Segment.java index e846d2e..f7fae7b 100644 --- a/core/src/main/java/com/softwaremill/jox/Segment.java +++ b/core/src/main/java/com/softwaremill/jox/Segment.java @@ -1,8 +1,7 @@ package com.softwaremill.jox; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicReferenceArray; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; final class Segment { /* @@ -25,26 +24,52 @@ private enum State { CLOSED } + // immutable state + private final long id; - private final AtomicReferenceArray data = new AtomicReferenceArray<>(SEGMENT_SIZE); + private final boolean isRendezvousOrUnlimited; + + // mutable state + + private final Object[] data = new Object[SEGMENT_SIZE]; /** * Possible values: {@code Segment} or {@code State.CLOSED} (union type). */ - private final AtomicReference next = new AtomicReference<>(null); - private final AtomicReference prev; + private volatile Object next = null; + private volatile Segment prev; /** * A single counter that can be inspected & modified atomically, which includes: * - the number of incoming pointers (shifted by {@link Segment#POINTERS_SHIFT} to the left) * - the number of cells, which haven't been interrupted & processed yet (in the first 6 bits) * When this reaches 0, the segment is logically removed. */ - private final AtomicInteger pointers_notProcessedAndInterrupted; - private final boolean isRendezvousOrUnlimited; + private volatile int pointers_notProcessedAndInterrupted; + + // var handles for mutable state + + private static final VarHandle DATA; + private static final VarHandle NEXT; + private static final VarHandle PREV; + private static final VarHandle POINTERS_NOT_PROCESSED_AND_INTERRUPTED; + + static { + try { + MethodHandles.Lookup l = MethodHandles.privateLookupIn(Segment.class, MethodHandles.lookup()); + DATA = MethodHandles.arrayElementVarHandle(Object[].class); + NEXT = l.findVarHandle(Segment.class, "next", Object.class); + PREV = l.findVarHandle(Segment.class, "prev", Segment.class); + POINTERS_NOT_PROCESSED_AND_INTERRUPTED = l.findVarHandle(Segment.class, "pointers_notProcessedAndInterrupted", int.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + + // Segment(long id, Segment prev, int pointers, boolean isRendezvousOrUnlimited) { this.id = id; - this.prev = new AtomicReference<>(prev); - this.pointers_notProcessedAndInterrupted = new AtomicInteger(SEGMENT_SIZE + (pointers << POINTERS_SHIFT)); + this.prev = prev; + this.pointers_notProcessedAndInterrupted = SEGMENT_SIZE + (pointers << POINTERS_SHIFT); this.isRendezvousOrUnlimited = isRendezvousOrUnlimited; } @@ -53,32 +78,32 @@ long getId() { } void cleanPrev() { - prev.set(null); + prev = null; } Segment getNext() { - var s = next.get(); + var s = next; return s == State.CLOSED ? null : (Segment) s; } Segment getPrev() { - return prev.get(); + return prev; } private boolean setNextIfNull(Segment setTo) { - return next.compareAndSet(null, setTo); + return NEXT.compareAndSet(this, null, setTo); } Object getCell(int index) { - return data.get(index); + return DATA.getVolatile(data, index); } void setCell(int index, Object value) { - data.set(index, value); + DATA.setVolatile(data, index, value); } boolean casCell(int index, Object expected, Object newValue) { - return data.compareAndSet(index, expected, newValue); + return DATA.compareAndSet(data, index, expected, newValue); } private boolean isTail() { @@ -90,7 +115,7 @@ private boolean isTail() { * have been interrupted & processed. */ boolean isRemoved() { - return pointers_notProcessedAndInterrupted.get() == 0; + return pointers_notProcessedAndInterrupted == 0; } /** @@ -101,11 +126,11 @@ boolean isRemoved() { boolean tryIncPointers() { int p; do { - p = pointers_notProcessedAndInterrupted.get(); + p = pointers_notProcessedAndInterrupted; if (p == 0) { return false; } - } while (!pointers_notProcessedAndInterrupted.compareAndSet(p, p + (1 << POINTERS_SHIFT))); + } while (!POINTERS_NOT_PROCESSED_AND_INTERRUPTED.compareAndSet(this, p, p + (1 << POINTERS_SHIFT))); return true; } @@ -115,7 +140,15 @@ boolean tryIncPointers() { * @return {@code true} if the segment becomes logically removed. */ boolean decPointers() { - return pointers_notProcessedAndInterrupted.updateAndGet(p -> p - (1 << POINTERS_SHIFT)) == 0; + // pointers_notProcessedAndInterrupted.updateAndGet(p -> p - (1 << POINTERS_SHIFT)) == 0 + var toAdd = -(1 << POINTERS_SHIFT); + while (true) { + var currentP = pointers_notProcessedAndInterrupted; + var updated = POINTERS_NOT_PROCESSED_AND_INTERRUPTED.compareAndSet(this, currentP, currentP + toAdd); + if (updated) { + return currentP + toAdd == 0; // is the new result 0? + } + } } /** @@ -124,7 +157,8 @@ boolean decPointers() { * Should be called at most once for each cell. Removes the segment, if it becomes logically removed. */ void cellInterruptedReceiver() { - if (pointers_notProcessedAndInterrupted.decrementAndGet() == 0) remove(); + // decrementAndGet() == 0 + if ((int) POINTERS_NOT_PROCESSED_AND_INTERRUPTED.getAndAdd(this, -1) == 1) remove(); } /** @@ -135,7 +169,8 @@ void cellInterruptedReceiver() { void cellInterruptedSender() { // in rendezvous/unlimited channels, cells are immediately processed when interrupted if (isRendezvousOrUnlimited) { - if (pointers_notProcessedAndInterrupted.decrementAndGet() == 0) remove(); + // decrementAndGet() == 0 + if ((int) POINTERS_NOT_PROCESSED_AND_INTERRUPTED.getAndAdd(this, -1) == 1) remove(); } } @@ -145,7 +180,8 @@ void cellInterruptedSender() { * Should be called at most once for each cell. Removes the segment, if it becomes logically removed. */ void cellProcessed() { - if (pointers_notProcessedAndInterrupted.decrementAndGet() == 0) remove(); + // decrementAndGet() == 0 + if ((int) POINTERS_NOT_PROCESSED_AND_INTERRUPTED.getAndAdd(this, -1) == 1) remove(); } /** @@ -162,8 +198,19 @@ void remove() { var _next = aliveSegmentRight(); // link next and prev - _next.prev.updateAndGet(p -> p == null ? null : _prev); - if (_prev != null) _prev.next.set(_next); + // _next.prev.update(p -> p == null ? null : _prev); + var prevOfNextUpdated = false; + do { + var currentPrevOfNext = _next.prev; + if (currentPrevOfNext == null) { + // leaving null + prevOfNextUpdated = true; + } else { + prevOfNextUpdated = PREV.compareAndSet(_next, currentPrevOfNext, _prev); + } + + } while (!prevOfNextUpdated); + if (_prev != null) _prev.next = _next; // double-checking if _prev & _next are still not removed if (_next.isRemoved() && !_next.isTail()) continue; @@ -180,9 +227,9 @@ void remove() { Segment close() { var s = this; while (true) { - var n = s.next.get(); + var n = s.next; if (n == null) { // this is the tail segment - if (s.next.compareAndSet(null, State.CLOSED)) { + if (NEXT.compareAndSet(s, null, State.CLOSED)) { return s; } // else: try again @@ -195,9 +242,9 @@ Segment close() { } private Segment aliveSegmentLeft() { - var s = prev.get(); + var s = prev; while (s != null && s.isRemoved()) { - s = s.prev.get(); + s = s.prev; } return s; } @@ -206,9 +253,9 @@ private Segment aliveSegmentLeft() { * Should only be called, if this is not the tail segment. */ private Segment aliveSegmentRight() { - var n = (Segment) next.get(); // this is not the tail, so there's a next segment + var n = (Segment) next; // this is not the tail, so there's a next segment while (n.isRemoved() && !n.isTail()) { - n = (Segment) n.next.get(); // again, not tail + n = (Segment) n.next; // again, not tail } return n; } @@ -221,13 +268,13 @@ private Segment aliveSegmentRight() { * * @return The found segment, or {@code null} if the segment chain is closed. */ - static Segment findAndMoveForward(AtomicReference ref, Segment start, long id) { + static Segment findAndMoveForward(VarHandle segmentVarHandle, Object segmentThis, Segment start, long id) { while (true) { var segment = findSegment(start, id); if (segment == null) { return null; } - if (moveForward(ref, segment)) { + if (moveForward(segmentVarHandle, segmentThis, segment)) { return segment; } } @@ -242,7 +289,7 @@ static Segment findAndMoveForward(AtomicReference ref, Segment start, l private static Segment findSegment(Segment start, long id) { var current = start; while (current.getId() < id || current.isRemoved()) { - var n = current.next.get(); + var n = current.next; if (n == State.CLOSED) { // segment chain is closed, so we can't create a new segment return null; @@ -271,9 +318,9 @@ private static Segment findSegment(Segment start, long id) { * @param to The segment to move the referenced segment to. * @return {@code true} if the move was successful, or a newer segment is already set, {@code false} otherwise. */ - private static boolean moveForward(AtomicReference ref, Segment to) { + private static boolean moveForward(VarHandle segmentVarHandle, Object segmentThis, Segment to) { while (true) { - var current = ref.get(); + var current = (Segment) segmentVarHandle.getVolatile(segmentThis); // the send segment might be already updated if (current.getId() >= to.getId()) { return true; @@ -283,7 +330,7 @@ private static boolean moveForward(AtomicReference ref, Segment to) { return false; } // try to update the ref - if (ref.compareAndSet(current, to)) { + if (segmentVarHandle.compareAndSet(segmentThis, current, to)) { // decrement pointers incoming to `to`, as it's no longer referenced via ref if (current.decPointers()) { current.remove(); @@ -301,9 +348,9 @@ private static boolean moveForward(AtomicReference ref, Segment to) { @Override public String toString() { - var n = next.get(); - var p = prev.get(); - var c = pointers_notProcessedAndInterrupted.get(); + var n = next; + var p = prev; + var c = pointers_notProcessedAndInterrupted; var notProcessedAndInterrupted = (c & ((1 << POINTERS_SHIFT) - 1)); var pointers = c >> POINTERS_SHIFT; @@ -320,6 +367,6 @@ public String toString() { // for tests void setNext(Segment newNext) { - next.set(newNext); + NEXT.set(this, newNext); } } diff --git a/core/src/main/java/com/softwaremill/jox/Select.java b/core/src/main/java/com/softwaremill/jox/Select.java index 1eb9489..daf28c8 100644 --- a/core/src/main/java/com/softwaremill/jox/Select.java +++ b/core/src/main/java/com/softwaremill/jox/Select.java @@ -1,7 +1,8 @@ package com.softwaremill.jox; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.util.*; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import java.util.function.Supplier; @@ -145,7 +146,18 @@ class SelectInstance { * - a {@link List} of clauses to re-register * - when selected, {@link SelectClause} (during registration) or {@link StoredSelectClause} (with suspension) */ - private final AtomicReference state = new AtomicReference<>(SelectState.REGISTERING); + private volatile Object state = SelectState.REGISTERING; + + private static final VarHandle STATE; + + static { + try { + MethodHandles.Lookup l = MethodHandles.privateLookupIn(SelectInstance.class, MethodHandles.lookup()); + STATE = l.findVarHandle(SelectInstance.class, "state", Object.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } /** * The content of the list will be written & read only by the main select thread. Hence, no synchronization is necessary. @@ -182,7 +194,7 @@ boolean register(SelectClause clause) { // when setting the state, we might override another state: // - a list of clauses to re-register - there's no point in doing that anyway (since the channel is closed) // - another closed state (set concurrently) - state.set(cc); + state = cc; return false; } else { // else: the clause was selected @@ -190,7 +202,7 @@ boolean register(SelectClause clause) { // when setting the state, we might override another state: // - a list of clauses to re-register - there's no point in doing that anyway (since we already selected a clause) // - a closed state - the closure must have happened concurrently with registration; we give priority to immediate selects then - state.set(clause); + state = clause; return false; } } @@ -203,14 +215,14 @@ boolean register(SelectClause clause) { */ Object checkStateAndWait() throws InterruptedException { while (true) { - var currentState = state.get(); + var currentState = state; if (currentState == SelectState.REGISTERING) { // registering done, waiting until a clause is selected - setting the thread to wake up as the state // we won't leave this case until the state is changed from Thread var currentThread = Thread.currentThread(); - if (state.compareAndSet(SelectState.REGISTERING, currentThread)) { + if (STATE.compareAndSet(this, SelectState.REGISTERING, currentThread)) { var spinIterations = Continuation.SPINS; - while (state.get() == currentThread) { + while (state == currentThread) { // same logic as in Continuation if (spinIterations > 0) { Thread.onSpinWait(); @@ -219,7 +231,7 @@ Object checkStateAndWait() throws InterruptedException { LockSupport.park(); if (Thread.interrupted()) { - if (state.compareAndSet(currentThread, SelectState.INTERRUPTED)) { + if (STATE.compareAndSet(this, currentThread, SelectState.INTERRUPTED)) { // since we changed the state, we know that none of the clauses will become completed cleanup(null); throw new InterruptedException(); @@ -236,7 +248,7 @@ Object checkStateAndWait() throws InterruptedException { // else: CAS unsuccessful, retry } else if (currentState instanceof List) { // moving the state back to registering - if (state.compareAndSet(currentState, SelectState.REGISTERING)) { + if (STATE.compareAndSet(this, currentState, SelectState.REGISTERING)) { //noinspection unchecked for (var clause : (List>) currentState) { // cleaning up & removing the stored select for the clause which we'll re-register @@ -301,9 +313,9 @@ private void cleanup(SelectClause selected) { */ boolean trySelect(StoredSelectClause storedSelectClause) { while (true) { - var currentState = state.get(); + var currentState = state; if (currentState == SelectState.REGISTERING) { - if (state.compareAndSet(currentState, Collections.singletonList(storedSelectClause.getClause()))) { + if (STATE.compareAndSet(this, currentState, Collections.singletonList(storedSelectClause.getClause()))) { return false; // concurrent clause selection is not possible during registration } // else: CAS unsuccessful, retry @@ -313,7 +325,7 @@ boolean trySelect(StoredSelectClause storedSelectClause) { //noinspection unchecked newClausesToReRegister.addAll((Collection>) clausesToReRegister); newClausesToReRegister.add(storedSelectClause.getClause()); - if (state.compareAndSet(currentState, newClausesToReRegister)) { + if (STATE.compareAndSet(this, currentState, newClausesToReRegister)) { return false; // concurrent clause selection is not possible during registration } // else: CAS unsuccessful, retry @@ -324,7 +336,7 @@ boolean trySelect(StoredSelectClause storedSelectClause) { // already selected, will be cleaned up soon return false; } else if (currentState instanceof Thread t) { - if (state.compareAndSet(currentState, storedSelectClause)) { + if (STATE.compareAndSet(this, currentState, storedSelectClause)) { LockSupport.unpark(t); return true; } @@ -343,16 +355,16 @@ boolean trySelect(StoredSelectClause storedSelectClause) { void channelClosed(StoredSelectClause storedSelectClause, ChannelClosed channelClosed) { while (true) { - var currentState = state.get(); + var currentState = state; if (currentState == SelectState.REGISTERING) { // the channel closed state will be discovered when there's a call to `checkStateAndWait` after registration completes - if (state.compareAndSet(currentState, channelClosed)) { + if (STATE.compareAndSet(this, currentState, channelClosed)) { return; } // else: CAS unsuccessful, retry } else if (currentState instanceof List) { // same as above - if (state.compareAndSet(currentState, channelClosed)) { + if (STATE.compareAndSet(this, currentState, channelClosed)) { return; } // else: CAS unsuccessful, retry @@ -363,7 +375,7 @@ void channelClosed(StoredSelectClause storedSelectClause, ChannelClosed channelC // already selected return; } else if (currentState instanceof Thread t) { - if (state.compareAndSet(currentState, channelClosed)) { + if (STATE.compareAndSet(this, currentState, channelClosed)) { LockSupport.unpark(t); return; } diff --git a/core/src/test/java/com/softwaremill/jox/SegmentRendezvousTest.java b/core/src/test/java/com/softwaremill/jox/SegmentRendezvousTest.java index 3be83fd..d7a42a0 100644 --- a/core/src/test/java/com/softwaremill/jox/SegmentRendezvousTest.java +++ b/core/src/test/java/com/softwaremill/jox/SegmentRendezvousTest.java @@ -2,9 +2,10 @@ import org.junit.jupiter.api.Test; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicReference; import static com.softwaremill.jox.Segment.SEGMENT_SIZE; import static com.softwaremill.jox.SegmentTest.createSegmentChain; @@ -16,6 +17,19 @@ public class SegmentRendezvousTest { // rendezvous segment = where countProcessed is false + // a field which we use for testing where VarHandle is required + private Segment someSegment; + private static VarHandle SOME_SEGMENT; + + static { + try { + MethodHandles.Lookup l = MethodHandles.privateLookupIn(SegmentRendezvousTest.class, MethodHandles.lookup()); + SOME_SEGMENT = l.findVarHandle(SegmentRendezvousTest.class, "someSegment", Segment.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + @Test void segmentShouldBecomeRemovedOnceAllCellsInterrupted() { // given @@ -184,16 +198,16 @@ void shouldNotResurrectAnUnlikedSegment() { void shouldFindAndMoveSegmentReferenceForward() { // given var s = createSegmentChain(4, 0, true); - var r = new AtomicReference<>(s[0]); + someSegment = s[0]; // when - var result = Segment.findAndMoveForward(r, s[0], 2); + var result = Segment.findAndMoveForward(SOME_SEGMENT, this, s[0], 2); // then assertEquals(s[2], result); // when - result = Segment.findAndMoveForward(r, s[0], 5); + result = Segment.findAndMoveForward(SOME_SEGMENT, this, s[0], 5); assertEquals(5, result.getId()); assertEquals(result, s[3].getNext().getNext()); } @@ -202,30 +216,30 @@ void shouldFindAndMoveSegmentReferenceForward() { void shouldMoveReferenceForwardIfClosedAndFoundSegmentExists() { // given var s = createSegmentChain(4, 0, true); - var r = new AtomicReference<>(s[0]); + someSegment = s[0]; // when s[0].close(); - var result = Segment.findAndMoveForward(r, s[0], 3); + var result = Segment.findAndMoveForward(SOME_SEGMENT, this, s[0], 3); // then assertEquals(s[3], result); - assertEquals(s[3], r.get()); + assertEquals(s[3], someSegment); } @Test void shouldNotMoveReferenceForwardIfClosedAndFoundSegmentDoesNotExist() { // given var s = createSegmentChain(4, 0, true); - var r = new AtomicReference<>(s[0]); + someSegment = s[0]; // when s[0].close(); - var result = Segment.findAndMoveForward(r, s[0], 5); + var result = Segment.findAndMoveForward(SOME_SEGMENT, this, s[0], 5); // then assertNull(result); - assertEquals(s[0], r.get()); + assertEquals(s[0], someSegment); } @Test @@ -241,7 +255,8 @@ void shouldRemoveOldTailSegment() { assertEquals(ss[0].getNext(), ss[1]); // logically, but not physically removed // when - var s2 = Segment.findAndMoveForward(new AtomicReference<>(ss[0]), ss[0], 2); + someSegment = ss[0]; + var s2 = Segment.findAndMoveForward(SOME_SEGMENT, this, ss[0], 2); // then assertEquals(s2, ss[0].getNext()); @@ -255,7 +270,8 @@ void shouldReturnNextSegmentIfRemoved() { sendInterruptAllCells(ss[1]); // when - var result = Segment.findAndMoveForward(new AtomicReference<>(ss[0]), ss[0], 1); + someSegment = ss[0]; + var result = Segment.findAndMoveForward(SOME_SEGMENT, this, ss[0], 1); // then assertEquals(ss[2], result); @@ -265,31 +281,31 @@ void shouldReturnNextSegmentIfRemoved() { void shouldNotUpdateSegmentReferenceIfAlreadyUpdated() { // given var ss = createSegmentChain(3, 0, true); - var ref = new AtomicReference<>(ss[2]); + someSegment = ss[2]; // when - var result = Segment.findAndMoveForward(ref, ss[0], 1); + var result = Segment.findAndMoveForward(SOME_SEGMENT, this, ss[0], 1); // then assertEquals(ss[1], result); - assertEquals(ss[2], ref.get()); + assertEquals(ss[2], someSegment); } @Test void shouldUpdateSegmentPointersWhenReferenceChanges() { // given var ss = createSegmentChain(3, 0, true); - var ref = new AtomicReference<>(ss[0]); + someSegment = ss[0]; // when - Segment.findAndMoveForward(ref, ss[0], 1); + Segment.findAndMoveForward(SOME_SEGMENT, this, ss[0], 1); // then sendInterruptAllCells(ss[1]); assertFalse(ss[1].isRemoved()); // shouldn't be removed because there's an incoming pointer // when - Segment.findAndMoveForward(ref, ss[0], 2); + Segment.findAndMoveForward(SOME_SEGMENT, this, ss[0], 2); // then assertTrue(ss[1].isRemoved()); // no more pointers -> logically removed @@ -302,7 +318,7 @@ void shouldConcurrentlyMoveSegmentsForward() throws ExecutionException, Interrup // given for (int k = 0; k < 1000; k++) { var ss = createSegmentChain(1, 0, true); - var ref = new AtomicReference<>(ss[0]); + someSegment = ss[0]; var observedSegments = new ConcurrentHashMap(); // when @@ -312,7 +328,7 @@ void shouldConcurrentlyMoveSegmentsForward() throws ExecutionException, Interrup forkVoid(scope, () -> { var s = ss[0]; for (int i = 0; i < 300; i++) { - s = Segment.findAndMoveForward(ref, s, i); + s = Segment.findAndMoveForward(SOME_SEGMENT, this, s, i); var previous = observedSegments.put(i, s); if (previous != s && previous != null) { fail("Already observed segment: " + previous + " for id: " + i + ", but found: " + s);