Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup send segments when closed, fix segment estimate in test #47

Merged
merged 2 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions core/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,6 @@ private Object doSend(T value, SelectInstance select, SelectClause<?> selectClau
var segment = sendSegment.get();
// reserving the next cell
var scf = sendersAndClosedFlag.getAndIncrement();
if (isClosed(scf)) {
return closedReason.get();
}
var s = getSendersCounter(scf);

// calculating the segment id and the index within the segment
Expand All @@ -206,6 +203,13 @@ private Object doSend(T value, SelectInstance select, SelectClause<?> selectClau
}
}

// performing the check only now, as even if the channel is closed, we want to move the send segment
// reference forward, so that segments which become eligible for removal can be GCed (after the channel
// is closed, e.g. when the channel is done and there are some values left to be received)
if (isClosed(scf)) {
return closedReason.get();
}

var sendResult = updateCellSend(segment, i, s, value, select, selectClause);
if (sendResult == SendResult.BUFFERED) {
// a receiver is coming, or we are in buffer
Expand Down
16 changes: 12 additions & 4 deletions core/src/test/java/com/softwaremill/jox/StressTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,18 @@ private void testAndVerify(int capacity, boolean direct) throws Exception {

for (var ch : chs) {
var segments = countOccurrences(ch.toString(), "Segment{");
// +1, as the buffer might be entirely in the next segment
// and another +1, as the send segment might not be moved forward, if in the next segment there are only IRs
var expectedSegments = Math.ceil((double) capacity / Segment.SEGMENT_SIZE) + 2;
assertTrue(segments <= expectedSegments, "there can be at most as much segments as needed to store the buffer + 1, but got: " + segments + " instead of " + expectedSegments + ".");

// In a worst-case scenario, the first thread might close the channel (using `done()`): this prevents the
// `sendSegment` reference from advancing. All other threads might have started a `send()` just before this,
// so the number of in-flight elements, waiting to be received (at the moment of calling `done()`) is
// `numberOfThreads + bufferSize`. Each element might have a separate segment (as all other cells might be
// interrupted/broken), so this also is a theoretical number of segments. When there are no more `send()`s,
// only `receive()`s, the `sendSegment` won't ever advance.
// This needs to be incremented by the number of in-buffer cells. Additionally, they might only start in the
// next segment - if there's a buffer.
// And another +1, as the tail segment might consist of IRs only.
var maxSegments = numberOfThreads + capacity + Math.ceil((double) capacity / Segment.SEGMENT_SIZE) + (capacity > 0 ? 1 : 0) + 1;
assertTrue(segments <= maxSegments, "got: " + segments + " instead of " + maxSegments + ".");
}
});
} catch (Exception e) {
Expand Down
Loading