diff --git a/okdownload-connection-okhttp/src/test/java/com/liulishuo/okdownload/core/connection/DownloadOkHttp3ConnectionTest.java b/okdownload-connection-okhttp/src/test/java/com/liulishuo/okdownload/core/connection/DownloadOkHttp3ConnectionTest.java index 4c0730a2..49dfea84 100644 --- a/okdownload-connection-okhttp/src/test/java/com/liulishuo/okdownload/core/connection/DownloadOkHttp3ConnectionTest.java +++ b/okdownload-connection-okhttp/src/test/java/com/liulishuo/okdownload/core/connection/DownloadOkHttp3ConnectionTest.java @@ -123,8 +123,10 @@ public void getInputStream_executed_getRightInputStream() throws IOException { .body(body).build(); when(call.execute()).thenReturn(response); + final BufferedSource source = mock(BufferedSource.class); + when(body.source()).thenReturn(source); final InputStream expectedInputStream = mock(InputStream.class); - when(body.byteStream()).thenReturn(expectedInputStream); + when(source.inputStream()).thenReturn(expectedInputStream); connection.execute(); diff --git a/okdownload/src/main/java/com/liulishuo/okdownload/core/file/MultiPointOutputStream.java b/okdownload/src/main/java/com/liulishuo/okdownload/core/file/MultiPointOutputStream.java index 7560604b..ea8f1fea 100644 --- a/okdownload/src/main/java/com/liulishuo/okdownload/core/file/MultiPointOutputStream.java +++ b/okdownload/src/main/java/com/liulishuo/okdownload/core/file/MultiPointOutputStream.java @@ -48,6 +48,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + public class MultiPointOutputStream { private static final String TAG = "MultiPointOutputStream"; private static final ExecutorService FILE_IO_EXECUTOR = new ThreadPoolExecutor(0, @@ -60,6 +62,7 @@ public class MultiPointOutputStream { final SparseArray noSyncLengthMap = new SparseArray<>(); final AtomicLong allNoSyncLength = new AtomicLong(); final AtomicLong lastSyncTimestamp = new AtomicLong(); + boolean canceled = false; private final int flushBufferSize; private final int syncBufferSize; @@ -80,6 +83,7 @@ public class MultiPointOutputStream { IOException syncException; @NonNull ArrayList noMoreStreamList; + @SuppressFBWarnings("IS2_INCONSISTENT_SYNC") List requireStreamBlocks; MultiPointOutputStream(@NonNull final DownloadTask task, @@ -119,7 +123,12 @@ public MultiPointOutputStream(@NonNull DownloadTask task, this(task, info, store, null); } - public void write(int blockIndex, byte[] bytes, int length) throws IOException { + public synchronized void write(int blockIndex, byte[] bytes, int length) throws IOException { + // if this task has been canceled, there is no need to write because of the output stream + // has been closed and there is no need to create a new output stream if this is a first + // write of this task block + if (canceled) return; + outputStream(blockIndex).write(bytes, 0, length); // because we add the length value after flush and sync, @@ -138,11 +147,16 @@ public void cancelAsync() { }); } - public void cancel() { + public synchronized void cancel() { if (requireStreamBlocks == null) return; + if (canceled) return; + canceled = true; + // must ensure sync thread is finished, then can invoke 'ensureSync(true, -1)' + // in try block, otherwise, try block will be blocked in 'ensureSync(true, -1)' and + // codes in finally block will not be invoked + noMoreStreamList.addAll(requireStreamBlocks); try { if (allNoSyncLength.get() <= 0) return; - noMoreStreamList.addAll(requireStreamBlocks); if (syncFuture != null && !syncFuture.isDone()) { inspectValidPath(); OkDownload.with().processFileStrategy().getFileLock().increaseLock(path); @@ -298,13 +312,13 @@ void inspectStreamState(StreamsState state) { final Set uniqueBlockList = new HashSet<>(clonedList); final int noMoreStreamBlockCount = uniqueBlockList.size(); if (noMoreStreamBlockCount != requireStreamBlocks.size()) { - Util.d(TAG, "current need fetch block count " + requireStreamBlocks.size() - + " is not equal to output stream created block count " + Util.d(TAG, "task[" + task.getId() + "] current need fetching block count " + + requireStreamBlocks.size() + " is not equal to no more stream block count " + noMoreStreamBlockCount); state.isNoMoreStream = false; } else { - Util.d(TAG, "current need fetch block count " + requireStreamBlocks.size() - + " is equal to output stream created block count " + Util.d(TAG, "task[" + task.getId() + "] current need fetching block count " + + requireStreamBlocks.size() + " is equal to no more stream block count " + noMoreStreamBlockCount); state.isNoMoreStream = true; } diff --git a/okdownload/src/test/java/com/liulishuo/okdownload/core/file/MultiPointOutputStreamTest.java b/okdownload/src/test/java/com/liulishuo/okdownload/core/file/MultiPointOutputStreamTest.java index f2ee9633..4d6617e7 100644 --- a/okdownload/src/test/java/com/liulishuo/okdownload/core/file/MultiPointOutputStreamTest.java +++ b/okdownload/src/test/java/com/liulishuo/okdownload/core/file/MultiPointOutputStreamTest.java @@ -143,7 +143,17 @@ public void write() throws IOException { } @Test - public void cancel_syncNotRun() throws IOException { + public void write_notRun_withCancelled() throws IOException { + multiPointOutputStream.canceled = true; + + multiPointOutputStream.write(0, bytes, 16); + + verify(multiPointOutputStream, never()).outputStream(0); + verify(multiPointOutputStream, never()).inspectAndPersist(); + } + + @Test + public void cancel_syncNotRun_withSyncFutureIsNull() throws IOException { multiPointOutputStream.requireStreamBlocks = new ArrayList() {{ add(0); add(1); @@ -169,14 +179,59 @@ public void cancel_syncNotRun() throws IOException { } @Test - public void cancel_requireStreamBlocksNotInitial() throws IOException { + public void cancel_syncNotRun_withSyncFutureHasDone() throws IOException { + multiPointOutputStream.requireStreamBlocks = new ArrayList() {{ + add(0); + add(1); + }}; multiPointOutputStream.allNoSyncLength.set(1); + doNothing().when(multiPointOutputStream).close(anyInt()); + doNothing().when(multiPointOutputStream).ensureSync(true, -1); + multiPointOutputStream.syncFuture = mock(Future.class); + when(multiPointOutputStream.syncFuture.isDone()).thenReturn(true); + final ProcessFileStrategy strategy = OkDownload.with().processFileStrategy(); final FileLock fileLock = mock(FileLock.class); when(strategy.getFileLock()).thenReturn(fileLock); multiPointOutputStream.cancel(); + assertThat(multiPointOutputStream.noMoreStreamList).containsExactly(0, 1); + verify(multiPointOutputStream, never()).ensureSync(eq(true), eq(-1)); + verify(multiPointOutputStream).close(eq(0)); + verify(multiPointOutputStream).close(eq(1)); + verify(fileLock, never()).increaseLock(eq(existFile.getAbsolutePath())); + verify(fileLock, never()).decreaseLock(eq(existFile.getAbsolutePath())); + verify(store).onTaskEnd(eq(task.getId()), eq(EndCause.CANCELED), nullable(Exception.class)); + } + + @Test + public void cancel_notRun_withRequireStreamBlocksNotInitial() throws IOException { + multiPointOutputStream.allNoSyncLength.set(1); + final ProcessFileStrategy strategy = OkDownload.with().processFileStrategy(); + final FileLock fileLock = mock(FileLock.class); + when(strategy.getFileLock()).thenReturn(fileLock); + + multiPointOutputStream.cancel(); + + verify(multiPointOutputStream, never()).ensureSync(anyBoolean(), anyInt()); + verify(fileLock, never()).increaseLock(eq(existFile.getAbsolutePath())); + verify(fileLock, never()).decreaseLock(eq(existFile.getAbsolutePath())); + verify(multiPointOutputStream, never()).close(anyInt()); + verify(store, never()).onTaskEnd(anyInt(), any(EndCause.class), any(Exception.class)); + } + + @Test + public void cancel_notRun_withCancelled() throws IOException { + multiPointOutputStream.allNoSyncLength.set(1); + multiPointOutputStream.canceled = true; + final ProcessFileStrategy strategy = OkDownload.with().processFileStrategy(); + final FileLock fileLock = mock(FileLock.class); + when(strategy.getFileLock()).thenReturn(fileLock); + + multiPointOutputStream.cancel(); + + assertThat(multiPointOutputStream.noMoreStreamList).hasSize(0); verify(multiPointOutputStream, never()).ensureSync(anyBoolean(), anyInt()); verify(fileLock, never()).increaseLock(eq(existFile.getAbsolutePath())); verify(fileLock, never()).decreaseLock(eq(existFile.getAbsolutePath())); @@ -532,7 +587,7 @@ public void inspectStreamState() { @Test public void setRequireStreamBlocks() { assertThat(multiPointOutputStream.requireStreamBlocks).isEqualTo(null); - final List requireStreamBlocks = new ArrayList(){{ + final List requireStreamBlocks = new ArrayList() {{ add(0); add(1); add(2); @@ -640,4 +695,5 @@ private void prepareOutputStreamEnv() throws FileNotFoundException, PreAllocateE when(task.getUri()).thenReturn(uri); when(uri.getScheme()).thenReturn("file"); } + } \ No newline at end of file