Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement io flows methods #78

Merged
merged 2 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.softwaremill.jox.flows;

import java.util.Iterator;
import java.util.NoSuchElementException;

class ByteArrayIterator implements Iterator<Byte> {
private final byte[] array;
private int position = 0;

public ByteArrayIterator(byte[] array) {
this.array = array;
}

@Override
public boolean hasNext() {
return position < array.length;
}

@Override
public Byte next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return array[position++];
}

public int available() {
return array.length - position;
}

public static ByteArrayIterator empty() {
return new ByteArrayIterator(new byte[0]);
}
}
131 changes: 130 additions & 1 deletion flows/src/main/java/com/softwaremill/jox/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@
import static com.softwaremill.jox.structured.Scopes.unsupervised;
import static java.lang.Thread.sleep;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -588,7 +597,6 @@ case ChannelError(Throwable cause):
if (!buffer.isEmpty()) {
sendBufferAndCleanupCost.call();
// cancel existing timeout and start a new one
if (timeoutFork != null) timeoutFork.cancelNow();
emil-bar marked this conversation as resolved.
Show resolved Hide resolved
timeoutFork = forkTimeout(scope, timerChannel, duration);
}
yield true;
Expand Down Expand Up @@ -1161,6 +1169,127 @@ public Flow<T> alsoToTap(Sink<T> other) {
});
}

/**
* Runs the flow into a {@link java.io.InputStream}.
* <p>
* Must be run within a concurrency scope, as under the hood the flow is run in the background.
* <p>
* Buffer capacity can be set via scoped value {@link Channel#BUFFER_SIZE}. If not specified in scope, {@link Channel#DEFAULT_BUFFER_SIZE} is used.
*/
public InputStream runToInputStream(UnsupervisedScope scope) {
Source<byte[]> ch = this
.map(t -> {
if (t instanceof byte[] bytes) {
return bytes;
} else {
throw new IllegalArgumentException("requirement failed: method can be called only on flow containing byte[]");
}
})
.runToChannel(scope);

return new InputStream() {
private ByteArrayIterator currentChunk = ByteArrayIterator.empty();

@Override
public int read() {
try {
if (!currentChunk.hasNext()) {
Object result = ch.receiveOrClosed();
if (result instanceof ChannelDone) {
return -1;
} else if (result instanceof ChannelError error) {
throw error.toException();
} else {
byte[] chunk = (byte[]) result;
currentChunk = new ByteArrayIterator(chunk);
}
}
return currentChunk.next() & 0xff; // Convert to unsigned
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public int available() {
return currentChunk.available();
}
};
}

/**
* Writes content of this flow to an {@link java.io.OutputStream}.
*
* @param outputStream
* Target `OutputStream` to write to. Will be closed after finishing the process or on error.
*/
public void runToOutputStream(OutputStream outputStream) throws Exception {
try {
last.run(t -> {
if (t instanceof byte[] chunk) {
outputStream.write(chunk);
} else {
throw new IllegalArgumentException("requirement failed: method can be called only on flow containing byte[]");
}
});
close(outputStream, null);
} catch (Exception e) {
close(outputStream, e);
throw e;
}
}

/** Writes content of this flow to a file.
*
* @param path
* Path to the target file. If not exists, it will be created.s
*/
public void runToFile(Path path) throws Exception {
if (Files.isDirectory(path)) {
throw new IOException("Path %s is a directory".formatted(path));
}
final SeekableByteChannel channel = getFileChannel(path);
try {
map(t -> {
if (t instanceof byte[] chunk) {
return chunk;
} else {
throw new IllegalArgumentException("requirement failed: method can be called only on flow containing byte[]");
}
}).runForeach(chunk -> {
try {
channel.write(ByteBuffer.wrap(chunk));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
close(channel, null);
} catch (Exception t) {
close(channel, t);
throw t;
}
}

private SeekableByteChannel getFileChannel(Path path) throws IOException {
try {
return FileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
} catch (UnsupportedOperationException e) {
// Some file systems don't support file channels
return Files.newByteChannel(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
}
}

private void close(AutoCloseable closeable, Exception cause) throws Exception {
try {
closeable.close();
} catch (IOException e) {
if (cause != null) {
cause.addSuppressed(e);
}
throw cause != null ? cause : e;
}
}

// endregion

private void forkPropagate(UnsupervisedScope unsupervisedScope, Sink<?> propagateExceptionsTo, Callable<Void> runnable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ void groupedWithin_shouldGroupFirstBatchOfElementsDueToTimeoutAndSecondBatchDueT
// when
var elementsWithEmittedTimeOffset = Flows.fromSource(c)
.groupedWithin(3, Duration.ofMillis(100))
.map(s -> new AbstractMap.SimpleEntry<>(s, Duration.ofNanos(System.nanoTime() - start)))
.map(s -> Map.entry(s, Duration.ofNanos(System.nanoTime() - start)))
.runToList();

// then
Expand Down
Loading
Loading