Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

01. Refactored SerialInputOutputManager #615

Merged
merged 10 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1434,7 +1434,7 @@ public void IoManager() throws Exception {
} catch (IllegalStateException ignored) {
}
try {
usb.ioManager.run();
usb.ioManager.runRead();
fail("already running error expected");
} catch (IllegalStateException ignored) {
}
Expand Down Expand Up @@ -1502,7 +1502,7 @@ public void IoManager() throws Exception {
usb.setParameters(19200, 8, 1, UsbSerialPort.PARITY_NONE);
telnet.setParameters(19200, 8, 1, UsbSerialPort.PARITY_NONE);
usb.ioManager.setThreadPriority(Process.THREAD_PRIORITY_DEFAULT);
Executors.newSingleThreadExecutor().submit(usb.ioManager);
usb.ioManager.start();
usb.waitForIoManagerStarted();
try {
usb.ioManager.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,23 @@
import android.os.Process;
import android.util.Log;

import com.hoho.android.usbserial.driver.SerialTimeoutException;
import com.hoho.android.usbserial.driver.UsbSerialPort;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

/**
* Utility class which services a {@link UsbSerialPort} in its {@link #run()} method.
* Utility class which services a {@link UsbSerialPort} in its {@link #runWrite()} ()} and {@link #runRead()} ()} ()} methods.
*
* @author mike wakerly ([email protected])
*/
public class SerialInputOutputManager implements Runnable {
public class SerialInputOutputManager {

public enum State {
STOPPED,
STARTING,
RUNNING,
STOPPING
}
Expand All @@ -33,9 +35,6 @@ public enum State {
private static final String TAG = SerialInputOutputManager.class.getSimpleName();
private static final int BUFSIZ = 4096;

/**
* default read timeout is infinite, to avoid data loss with bulkTransfer API
*/
private int mReadTimeout = 0;
private int mWriteTimeout = 0;

Expand All @@ -46,7 +45,9 @@ public enum State {
private ByteBuffer mWriteBuffer = ByteBuffer.allocate(BUFSIZ);

private int mThreadPriority = Process.THREAD_PRIORITY_URGENT_AUDIO;
private State mState = State.STOPPED; // Synchronized by 'this'
private final AtomicReference<State> mState = new AtomicReference<>(State.STOPPED);
private CountDownLatch mStartuplatch = new CountDownLatch(2);
private CountDownLatch mShutdownlatch = new CountDownLatch(2);
private Listener mListener; // Synchronized by 'this'
private final UsbSerialPort mSerialPort;

Expand All @@ -57,7 +58,7 @@ public interface Listener {
void onNewData(byte[] data);

/**
* Called when {@link SerialInputOutputManager#run()} aborts due to an error.
* Called when {@link SerialInputOutputManager#runRead()} ()} or {@link SerialInputOutputManager#runWrite()} ()} ()} aborts due to an error.
*/
void onRunError(Exception e);
}
Expand Down Expand Up @@ -87,17 +88,33 @@ public synchronized Listener getListener() {
* @param threadPriority see {@link Process#setThreadPriority(int)}
* */
public void setThreadPriority(int threadPriority) {
if (mState != State.STOPPED)
if (!mState.compareAndSet(State.STOPPED, State.STOPPED)) {
throw new IllegalStateException("threadPriority only configurable before SerialInputOutputManager is started");
}
mThreadPriority = threadPriority;
}

/**
dkaukov marked this conversation as resolved.
Show resolved Hide resolved
* read/write buffer size
*/
public void setReadBufferSize(int bufferSize) {
if (getReadBufferSize() == bufferSize)
return;
synchronized (mReadBufferLock) {
mReadBuffer = ByteBuffer.allocate(bufferSize);
}
}

public int getReadBufferSize() {
return mReadBuffer.capacity();
}

/**
* read/write timeout
*/
public void setReadTimeout(int timeout) {
// when set if already running, read already blocks and the new value will not become effective now
if(mReadTimeout == 0 && timeout != 0 && mState != State.STOPPED)
if(mReadTimeout == 0 && timeout != 0 && mState.get() != State.STOPPED)
throw new IllegalStateException("readTimeout only configurable before SerialInputOutputManager is started");
mReadTimeout = timeout;
}
Expand All @@ -114,21 +131,6 @@ public int getWriteTimeout() {
return mWriteTimeout;
}

/**
* read/write buffer size
*/
public void setReadBufferSize(int bufferSize) {
if (getReadBufferSize() == bufferSize)
return;
synchronized (mReadBufferLock) {
mReadBuffer = ByteBuffer.allocate(bufferSize);
}
}

public int getReadBufferSize() {
return mReadBuffer.capacity();
}

public void setWriteBufferSize(int bufferSize) {
if(getWriteBufferSize() == bufferSize)
return;
Expand All @@ -155,81 +157,137 @@ public void writeAsync(byte[] data) {
}

/**
* start SerialInputOutputManager in separate thread
* start SerialInputOutputManager in separate threads
*/
public void start() {
if(mState != State.STOPPED)
if(mState.compareAndSet(State.STOPPED, State.STARTING)) {
mStartuplatch = new CountDownLatch(2);
mShutdownlatch = new CountDownLatch(2);
new Thread(this::runRead, this.getClass().getSimpleName() + "_read").start();
new Thread(this::runWrite, this.getClass().getSimpleName() + "_write").start();
try {
mStartuplatch.await();
mState.set(State.RUNNING);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
throw new IllegalStateException("already started");
new Thread(this, this.getClass().getSimpleName()).start();
}
}

/**
* stop SerialInputOutputManager thread
* stop SerialInputOutputManager threads
*
* when using readTimeout == 0 (default), additionally use usbSerialPort.close() to
* interrupt blocking read
*/
public synchronized void stop() {
if (getState() == State.RUNNING) {
public void stop() {
if(mState.compareAndSet(State.RUNNING, State.STOPPING)) {
Log.i(TAG, "Stop requested");
mState = State.STOPPING;
}
}

public synchronized State getState() {
return mState;
public State getState() {
return mState.get();
}

/**
* Continuously services the read and write buffers until {@link #stop()} is
* called, or until a driver exception is raised.
* @return true if the thread is still running
*/
@Override
public void run() {
synchronized (this) {
if (getState() != State.STOPPED) {
throw new IllegalStateException("Already running");
private boolean isStillRunning() {
State state = mState.get();
return ((state == State.RUNNING) || (state == State.STARTING))
&& (mShutdownlatch.getCount() == 2)
dkaukov marked this conversation as resolved.
Show resolved Hide resolved
&& !Thread.currentThread().isInterrupted();
dkaukov marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Notify listener of an error
*
* @param e the exception
*/
private void notifyErrorListener(Throwable e) {
Listener listener = getListener();
if (listener != null) {
try {
listener.onRunError(e instanceof Exception ? (Exception) e : new Exception(e));
} catch (Throwable t) {
Log.w(TAG, "Exception in onRunError: " + t.getMessage(), t);
}
mState = State.RUNNING;
}
Log.i(TAG, "Running ...");
}

/**
* Set the thread priority
*/
private void setThreadPriority() {
if (mThreadPriority != Process.THREAD_PRIORITY_DEFAULT) {
Process.setThreadPriority(mThreadPriority);
}
}

/**
* Continuously services the read buffers until {@link #stop()} is called, or until a driver exception is
* raised.
*/
public void runRead() {
Log.i(TAG, "runRead running ...");
try {
if(mThreadPriority != Process.THREAD_PRIORITY_DEFAULT)
Process.setThreadPriority(mThreadPriority);
while (true) {
if (getState() != State.RUNNING) {
Log.i(TAG, "Stopping mState=" + getState());
break;
}
step();
}
setThreadPriority();
mStartuplatch.countDown();
do {
stepRead();
} while (isStillRunning());
Log.i(TAG, "runRead: Stopping mState=" + getState());
} catch (Throwable e) {
if(mSerialPort.isOpen()) {
dkaukov marked this conversation as resolved.
Show resolved Hide resolved
Log.w(TAG, "Run ending due to exception: " + e.getMessage(), e);
if (Thread.currentThread().isInterrupted()) {
Log.w(TAG, "Thread interrupted, stopping runRead.");
} else {
Log.i(TAG, "Socket closed");
Log.w(TAG, "runRead ending due to exception: " + e.getMessage(), e);
notifyErrorListener(e);
}
final Listener listener = getListener();
if (listener != null) {
try {
if (e instanceof Exception) {
listener.onRunError((Exception) e);
} else {
listener.onRunError(new Exception(e));
}
} catch (Throwable t) {
Log.w(TAG, "Exception in onRunError: " + t.getMessage(), t);
} finally {
if (!mState.compareAndSet(State.RUNNING, State.STOPPING)) {
if (mState.compareAndSet(State.STOPPING, State.STOPPED)) {
Log.i(TAG, "runRead: Stopped mState=" + getState());
}
}
mShutdownlatch.countDown();
}
}

/**
* Continuously services the write buffers until {@link #stop()} is called, or until a driver exception is
* raised.
*/
public void runWrite() {
Log.i(TAG, "runWrite running ...");
try {
setThreadPriority();
mStartuplatch.countDown();
do {
stepWrite();
dkaukov marked this conversation as resolved.
Show resolved Hide resolved
} while (isStillRunning());
Log.i(TAG, "runWrite: Stopping mState=" + getState());
} catch (Throwable e) {
if (Thread.currentThread().isInterrupted()) {
Log.w(TAG, "Thread interrupted, stopping runWrite.");
} else {
Log.w(TAG, "runWrite ending due to exception: " + e.getMessage(), e);
notifyErrorListener(e);
}
} finally {
synchronized (this) {
mState = State.STOPPED;
Log.i(TAG, "Stopped");
if (!mState.compareAndSet(State.RUNNING, State.STOPPING)) {
if (mState.compareAndSet(State.STOPPING, State.STOPPED)) {
Log.i(TAG, "runWrite: Stopped mState=" + getState());
}
}
mShutdownlatch.countDown();
}
}

private void step() throws IOException {
private void stepRead() throws IOException {
// Handle incoming data.
byte[] buffer;
synchronized (mReadBufferLock) {
Expand All @@ -247,11 +305,13 @@ private void step() throws IOException {
listener.onNewData(data);
}
}
}

private void stepWrite() throws IOException {
// Handle outgoing data.
buffer = null;
byte[] buffer = null;
synchronized (mWriteBufferLock) {
len = mWriteBuffer.position();
int len = mWriteBuffer.position();
if (len > 0) {
buffer = new byte[len];
mWriteBuffer.rewind();
Expand All @@ -261,25 +321,9 @@ private void step() throws IOException {
}
if (buffer != null) {
if (DEBUG) {
Log.d(TAG, "Writing data len=" + len);
}
try {
mSerialPort.write(buffer, mWriteTimeout);
} catch (SerialTimeoutException ex) {
synchronized (mWriteBufferLock) {
byte[] buffer2 = null;
int len2 = mWriteBuffer.position();
if (len2 > 0) {
buffer2 = new byte[len2];
mWriteBuffer.rewind();
mWriteBuffer.get(buffer2, 0, len2);
mWriteBuffer.clear();
}
mWriteBuffer.put(buffer, ex.bytesTransferred, buffer.length - ex.bytesTransferred);
if (buffer2 != null)
mWriteBuffer.put(buffer2);
}
Log.d(TAG, "Writing data len=" + buffer.length);
}
mSerialPort.write(buffer, mWriteTimeout);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ class ErrorListener implements SerialInputOutputManager.Listener {

ExceptionListener exceptionListener = new ExceptionListener();
manager.setListener(exceptionListener);
manager.run();
manager.runRead();
assertEquals(RuntimeException.class, exceptionListener.e.getClass());
assertEquals("exception1", exceptionListener.e.getMessage());

ErrorListener errorListener = new ErrorListener();
manager.setListener(errorListener);
manager.run();
manager.runRead();
assertEquals(Exception.class, errorListener.e.getClass());
assertEquals("java.lang.UnknownError: error1", errorListener.e.getMessage());
assertEquals(UnknownError.class, errorListener.e.getCause().getClass());
Expand Down