Skip to content

Commit

Permalink
Implement ByteChunk and migrate methods to use it. (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
emil-bar authored Jan 9, 2025
1 parent aa4f8a9 commit 7700513
Show file tree
Hide file tree
Showing 10 changed files with 772 additions and 385 deletions.
143 changes: 143 additions & 0 deletions flows/src/main/java/com/softwaremill/jox/flows/ByteChunk.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package com.softwaremill.jox.flows;

import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.IntStream;

/**
* A simple wrapper class for byte array which purpose is to simply operations and processing.
* ByteChunk is immutable.
*/
public class ByteChunk {

private final byte[] array;

public ByteChunk(byte[] array) {
this.array = array;
}

public static ByteChunk fromArray(byte[] array) {
return new ByteChunk(array);
}

public static ByteChunk empty() {
return new ByteChunk(new byte[0]);
}

/**
* @param idx index of the element
* @return element positioned at the given index
*/
public byte get(int idx) {
if (idx < 0 || idx >= array.length) {
throw new IndexOutOfBoundsException("Index %d out of bounds for array of length %d".formatted(idx, array.length));
}
return array[idx];
}

/**
* @return Chunk converted to String using given charset
*/
public String convertToString(Charset charset) {
return new String(toArray(), charset);
}

/**
* @return Iterator over the elements of the Chunk
*/
public Iterator<Byte> iterator() {
return IntStream.range(0, array.length)
.mapToObj(i -> array[i])
.iterator();
}

/**
* @return number of elements in the Chunk
*/
public int length() {
return array.length;
}

/**
* Takes n elements from the beginning of the Chunk and returns copy of the result
*/
public ByteChunk take(int n) {
return new ByteChunk(Arrays.copyOf(array, n));
}

/**
* Drops n elements from the beginning of the array and returns copy of the result
*/
public ByteChunk drop(int n) {
return new ByteChunk(Arrays.copyOfRange(array, n, array.length));
}

/**
* @param idx index at which to split the Chunk
* @return Pair of Chunks, first containing elements from 0 to idx (exclusive), second containing elements from idx to the end
*/
public Map.Entry<ByteChunk, ByteChunk> splitAt(int idx) {
return Map.entry(take(idx), drop(idx));
}

/**
* @param other Chunk to concatenate with this
* @return new Chunk containing elements of this and other
*/
public ByteChunk concat(ByteChunk other) {
return ByteChunk.fromArray(concatArrays(toArray(), other.toArray()));
}

/**
* @param condition function that returns true for element that should be found
* @return index of the first element that satisfies the condition, or -1 if no such element is found
*/
public int indexWhere(Function<Byte, Boolean> condition) {
Iterator<Byte> iterator = iterator();
int i = 0;
while (iterator.hasNext()) {
Byte item = iterator.next();
if (condition.apply(item)) {
return i;
}
i++;
}
return -1;
}

/**
* @return copy of the Chunk as an array
*/
public byte[] toArray() {
return Arrays.copyOf(array, array.length);
}

/**
* Checks if this starts with chunk b
*/
public boolean startsWith(ByteChunk other) {
return Arrays.equals(this.take(other.length()).toArray(), other.toArray());
}

@Override
public int hashCode() {
return Arrays.hashCode(array);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
ByteChunk that = (ByteChunk) obj;
return Arrays.equals(array, that.array);
}

private static byte[] concatArrays(byte[] first, byte[] second) {
byte[] result = Arrays.copyOf(first, first.length + second.length);
System.arraycopy(second, 0, result, first.length, second.length);
return result;
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.softwaremill.jox.flows;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -17,58 +15,53 @@
*/
class ChunksUtf8Decoder {
private static final int BOM_SIZE = 3; // const for UTF-8
private static final byte[] BOM_UTF8 = new byte[]{-17, -69, -65};
private static final ByteChunk BOM_UTF8 = ByteChunk.fromArray(new byte[]{-17, -69, -65});

public static <T> Flow<String> decodeStringUtf8(FlowStage<T> flowStage) {
public static Flow<String> decodeStringUtf8(FlowStage<ByteChunk> flowStage) {
return Flows.usingEmit(emit -> {
final AtomicReference<State> state = new AtomicReference<>(State.ProcessBOM);
final AtomicReference<byte[]> buffer = new AtomicReference<>(null);
final AtomicReference<ByteChunk> buffer = new AtomicReference<>(null);

flowStage.run(t -> {
if (!(t instanceof byte[] bytes)) {
throw new IllegalArgumentException("requirement failed: method can be called only on flow containing byte[]");
}
byte[] newBuffer;
ByteChunk newBuffer;
State newState;
if (state.get() == State.ProcessBOM) {
Map.Entry<byte[], State> processResult = processByteOrderMark(bytes, buffer.get());
Map.Entry<ByteChunk, State> processResult = processByteOrderMark(t, buffer.get());
newBuffer = processResult.getKey();
newState = processResult.getValue();
} else {
newBuffer = doPull(bytes, buffer.get(), emit);
newBuffer = doPull(t, buffer.get(), emit);
newState = State.Pull;
}
buffer.set(newBuffer);
state.set(newState);
});
// A common case, worth checking in advance

if (buffer.get() != null && buffer.get().length > 0) {
emit.apply(new String(buffer.get(), StandardCharsets.UTF_8));
if (buffer.get() != null && buffer.get().length() > 0) {
emit.apply(new String(buffer.get().toArray(), StandardCharsets.UTF_8));
}
});
}

private static Map.Entry<byte[], State> processByteOrderMark(byte[] bytes, byte[] buffer) {
private static Map.Entry<ByteChunk, State> processByteOrderMark(ByteChunk bytes, ByteChunk buffer) {
// A common case, worth checking in advance
if (buffer == null && bytes.length >= BOM_SIZE && startsWith(bytes, BOM_UTF8)) {
if (buffer == null && bytes.length() >= BOM_SIZE && bytes.startsWith(BOM_UTF8)) {
return Map.entry(bytes, State.Pull);
} else {
byte[] newBuffer0 = buffer == null ? new byte[0] : buffer;
byte[] newBuffer = Arrays.copyOf(newBuffer0, newBuffer0.length + bytes.length);
newBuffer = ByteBuffer.wrap(newBuffer).put(newBuffer0.length, bytes).array();
if (newBuffer.length >= BOM_SIZE) {
byte[] rem = startsWith(newBuffer, BOM_UTF8) ? drop(newBuffer, BOM_SIZE) : newBuffer;
var newBuffer = (buffer == null ? ByteChunk.empty() : buffer).concat(bytes);
if (newBuffer.length() >= BOM_SIZE) {
ByteChunk rem = newBuffer.startsWith(BOM_UTF8) ? newBuffer.drop(BOM_SIZE) : newBuffer;
return Map.entry(rem, State.Pull);
} else if (startsWith(newBuffer, take(BOM_UTF8, newBuffer.length))) {
} else if (newBuffer.startsWith(BOM_UTF8.take(newBuffer.length()))) {
return Map.entry(newBuffer, State.ProcessBOM); // we've accumulated less than the full BOM, let's pull some more
} else {
return Map.entry(newBuffer, State.Pull); // We've accumulated less than BOM size but we already know that these bytes aren't BOM
}
}
}

private static byte[] doPull(byte[] bytes, byte[] buffer, FlowEmit<String> output) throws Exception {
private static ByteChunk doPull(ByteChunk bytes, ByteChunk buffer, FlowEmit<String> output) throws Exception {
var result = processSingleChunk(buffer, bytes);
Optional<String> str = result.getKey();
if (str.isPresent()) {
Expand All @@ -77,67 +70,46 @@ private static byte[] doPull(byte[] bytes, byte[] buffer, FlowEmit<String> outpu
return result.getValue();
}

private static Map.Entry<Optional<String>, byte[]> processSingleChunk(byte[] buffer, byte[] nextBytes) {
byte[] allBytes;
if (buffer == null || buffer.length == 0) {
private static Map.Entry<Optional<String>, ByteChunk> processSingleChunk(ByteChunk buffer, ByteChunk nextBytes) {
ByteChunk allBytes;
if (buffer == null || buffer.length() == 0) {
allBytes = nextBytes;
} else {
allBytes = Arrays.copyOf(buffer, buffer.length + nextBytes.length);
allBytes = ByteBuffer.wrap(allBytes).put(buffer.length, nextBytes).array();
allBytes = buffer.concat(nextBytes);
}

int splitAt = allBytes.length - lastIncompleteBytes(allBytes);
if (splitAt == allBytes.length) {
int splitAt = allBytes.length() - lastIncompleteBytes(allBytes);
if (splitAt == allBytes.length()) {
// in the common case of ASCII chars
// we are in this branch so the next buffer will
// be empty
return Map.entry(Optional.of(new String(allBytes, StandardCharsets.UTF_8)), new byte[0]);
return Map.entry(Optional.of(new String(allBytes.toArray(), StandardCharsets.UTF_8)), ByteChunk.empty());
} else if (splitAt == 0) {
return Map.entry(Optional.empty(), allBytes);
} else {
Map.Entry<ByteChunk, ByteChunk> result = allBytes.splitAt(splitAt);
return Map.entry(
// character
Optional.of(new String(Arrays.copyOfRange(allBytes, 0, splitAt), StandardCharsets.UTF_8)),
Optional.of(new String(result.getKey().toArray(), StandardCharsets.UTF_8)),
// remaining bytes
Arrays.copyOfRange(allBytes, splitAt, allBytes.length)
result.getValue()
);
}
}

/**
* Takes n elements from the beginning of the array and returns copy of the result
*/
private static byte[] take(byte[] a, int n) {
return Arrays.copyOfRange(a, 0, n);
}

/**
* Drops n elements from the beginning of the array and returns copy of the result
*/
private static byte[] drop(byte[] a, int n) {
return Arrays.copyOfRange(a, n, a.length);
}

/**
* Checks if array a starts with array b
*/
private static boolean startsWith(byte[] a, byte[] b) {
return ByteBuffer.wrap(a, 0, b.length).equals(ByteBuffer.wrap(b));
}

/*
* Copied from scala lib fs2 (fs2.text.decodeC.lastIncompleteBytes)
* Returns the length of an incomplete multi-byte sequence at the end of
* `bs`. If `bs` ends with an ASCII byte or a complete multi-byte sequence,
* 0 is returned.
*/
private static int lastIncompleteBytes(byte[] bs) {
int minIdx = Math.max(0, bs.length - 3);
int idx = bs.length - 1;
private static int lastIncompleteBytes(ByteChunk bs) {
int minIdx = Math.max(0, bs.length() - 3);
int idx = bs.length() - 1;
int counter = 0;
int res = 0;
while (minIdx <= idx) {
int c = continuationBytes(bs[idx]);
int c = continuationBytes(bs.get(idx));
if (c >= 0) {
if (c != counter) {
res = counter + 1;
Expand Down
Loading

0 comments on commit 7700513

Please sign in to comment.