From c2ae54e130000013ccdbae9bf3e7c1dd31e85449 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Fri, 18 Aug 2017 00:33:57 -0700 Subject: [PATCH] Update BGZF block decode logic and refactor BGZFSplitGuesser on BaseSplitGuesser Stems out of work done in #142 and #149. Refactors BGZFSplitGuesser to use the logic in the BaseSplitGuesser class. To do this, we need to make BaseSplitGuesser public. Additionally, we replace the logic that is used in BaseSplitGuesser to identify the start of a BGZF block with logic that doesn't require copying bytes from the input stream to a buffer. This improves the performance of identifying the start of a BGZF block; for whatever reason, that copy to the buffer/methods on the buffer are expensive. --- .../seqdoop/hadoop_bam/BCFSplitGuesser.java | 3 + .../seqdoop/hadoop_bam/BaseSplitGuesser.java | 138 ++++++++---------- .../seqdoop/hadoop_bam/util/BGZFCodec.java | 4 +- .../util/BGZFEnhancedGzipCodec.java | 4 +- .../util/BGZFSplitFileInputFormat.java | 7 +- .../hadoop_bam/util/BGZFSplitGuesser.java | 121 ++++----------- .../hadoop_bam/TestBGZFSplitGuesser.java | 4 +- 7 files changed, 99 insertions(+), 182 deletions(-) diff --git a/src/main/java/org/seqdoop/hadoop_bam/BCFSplitGuesser.java b/src/main/java/org/seqdoop/hadoop_bam/BCFSplitGuesser.java index 70cb533..ced7fb6 100644 --- a/src/main/java/org/seqdoop/hadoop_bam/BCFSplitGuesser.java +++ b/src/main/java/org/seqdoop/hadoop_bam/BCFSplitGuesser.java @@ -364,6 +364,9 @@ private long getUInt(final int idx) { private short getUByte(final int idx) { return (short)((short)buf.get(idx) & 0xff); } + private short getUShort(final int idx) { + return (short)(buf.getShort(idx) & 0xffff); + } public static void main(String[] args) throws IOException { final GenericOptionsParser parser; diff --git a/src/main/java/org/seqdoop/hadoop_bam/BaseSplitGuesser.java b/src/main/java/org/seqdoop/hadoop_bam/BaseSplitGuesser.java index 6f96b39..5c19bb3 100644 --- a/src/main/java/org/seqdoop/hadoop_bam/BaseSplitGuesser.java +++ b/src/main/java/org/seqdoop/hadoop_bam/BaseSplitGuesser.java @@ -6,11 +6,14 @@ import java.nio.ByteOrder; import org.apache.hadoop.io.IOUtils; -class BaseSplitGuesser { +public class BaseSplitGuesser { - protected final static int BGZF_MAGIC = 0x04088b1f; - protected final static int BGZF_MAGIC_SUB = 0x00024342; - protected final static int BGZF_SUB_SIZE = 4 + 2; + private final static int BGZF_MAGIC_0 = 0x1f; + private final static int BGZF_MAGIC_1 = 0x8b; + private final static int BGZF_MAGIC_2 = 0x08; + private final static int BGZF_MAGIC_3 = 0x04; + protected final static int BGZF_MAGIC = 0x04088b1f; + private final static int BGZF_MAGIC_SUB = 0x00024342; protected SeekableStream in; protected final ByteBuffer buf; @@ -29,85 +32,60 @@ protected static class PosSize { // Gives the compressed size on the side. Returns null if it doesn't find // anything. protected PosSize guessNextBGZFPos(int p, int end) { - try { for (;;) { - for (;;) { - in.seek(p); - IOUtils.readFully(in, buf.array(), 0, 4); - int n = buf.getInt(0); - - if (n == BGZF_MAGIC) - break; - - // Skip ahead a bit more than 1 byte if you can. - if (n >>> 8 == BGZF_MAGIC << 8 >>> 8) - ++p; - else if (n >>> 16 == BGZF_MAGIC << 16 >>> 16) - p += 2; - else - p += 3; - - if (p >= end) - return null; - } - // Found what looks like a gzip block header: now get XLEN and - // search for the BGZF subfield. - final int p0 = p; - p += 10; - in.seek(p); - IOUtils.readFully(in, buf.array(), 0, 2); - p += 2; - final int xlen = getUShort(0); - final int subEnd = p + xlen; - - while (p < subEnd) { - IOUtils.readFully(in, buf.array(), 0, 4); - - if (buf.getInt(0) != BGZF_MAGIC_SUB) { - p += 4 + getUShort(2); - in.seek(p); - continue; - } - - // Found it: this is close enough to a BGZF block, make it - // our guess. - - // But find out the size before returning. First, grab bsize: - // we'll need it later. - IOUtils.readFully(in, buf.array(), 0, 2); - int bsize = getUShort(0); - - // Then skip the rest of the subfields. - p += BGZF_SUB_SIZE; - while (p < subEnd) { + try { + while(true) { + boolean found_block_start = false; + boolean in_magic = false; in.seek(p); - IOUtils.readFully(in, buf.array(), 0, 4); - p += 4 + getUShort(2); - } - if (p != subEnd) { - // Cancel our guess because the xlen field didn't match the - // data. - break; - } - - // Now skip past the compressed data and the CRC-32. - p += bsize - xlen - 19 + 4; - in.seek(p); - IOUtils.readFully(in, buf.array(), 0, 4); - return new PosSize(p0, buf.getInt(0)); + while(!found_block_start) { + int n = in.read(); + + if (n == BGZF_MAGIC_0) { + in_magic = true; + } else if (n == BGZF_MAGIC_3 && in_magic) { + found_block_start = true; + } else if (p >= end) { + return null; + } else if (!((n == BGZF_MAGIC_1 && in_magic) || + (n == BGZF_MAGIC_2 && in_magic))) { + in_magic = false; + } + p++; + } + + // after the magic number: + // skip 6 unspecified bytes (MTIME, XFL, OS) + // XLEN = 6 (little endian, so 0x0600) + // SI1 = 0x42 + // SI2 = 0x43 + // SLEN = 0x02 + in.seek(p + 6); + int n = in.read(); + if (0x06 != n) { + continue; + } + n = in.read(); + if (0x00 != n) { + continue; + } + n = in.read(); + if (0x42 != n) { + continue; + } + n = in.read(); + if (0x43 != n) { + continue; + } + n = in.read(); + if (0x02 != n) { + continue; + } + int blockSize = (in.read() << 8) + in.read(); + + return new PosSize(p - 4, blockSize); } - // No luck: look for the next gzip block header. Start right after - // where we last saw the identifiers, although we could probably - // safely skip further ahead. (If we find the correct one right - // now, the previous block contained 0x1f8b0804 bytes of data: that - // seems... unlikely.) - p = p0 + 4; - - }} catch (IOException e) { + } catch (IOException e) { return null; } } - - protected int getUShort(final int idx) { - return (int)buf.getShort(idx) & 0xffff; - } } diff --git a/src/main/java/org/seqdoop/hadoop_bam/util/BGZFCodec.java b/src/main/java/org/seqdoop/hadoop_bam/util/BGZFCodec.java index 669e2ac..e453fb9 100644 --- a/src/main/java/org/seqdoop/hadoop_bam/util/BGZFCodec.java +++ b/src/main/java/org/seqdoop/hadoop_bam/util/BGZFCodec.java @@ -3,6 +3,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionOutputStream; @@ -11,6 +12,7 @@ import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.SplitCompressionInputStream; import org.apache.hadoop.io.compress.SplittableCompressionCodec; +import org.seqdoop.hadoop_bam.util.WrapSeekable; /** * A Hadoop {@link CompressionCodec} for the @@ -56,7 +58,7 @@ public Compressor createCompressor() { @Override public SplitCompressionInputStream createInputStream(InputStream seekableIn, Decompressor decompressor, long start, long end, READ_MODE readMode) throws IOException { - BGZFSplitGuesser splitGuesser = new BGZFSplitGuesser(seekableIn); + BGZFSplitGuesser splitGuesser = new BGZFSplitGuesser(new WrapSeekable((FSDataInputStream)seekableIn, end, null)); long adjustedStart = splitGuesser.guessNextBGZFBlockStart(start, end); ((Seekable)seekableIn).seek(adjustedStart); return new BGZFSplitCompressionInputStream(seekableIn, adjustedStart, end); diff --git a/src/main/java/org/seqdoop/hadoop_bam/util/BGZFEnhancedGzipCodec.java b/src/main/java/org/seqdoop/hadoop_bam/util/BGZFEnhancedGzipCodec.java index 04112a7..0df0c77 100644 --- a/src/main/java/org/seqdoop/hadoop_bam/util/BGZFEnhancedGzipCodec.java +++ b/src/main/java/org/seqdoop/hadoop_bam/util/BGZFEnhancedGzipCodec.java @@ -4,6 +4,7 @@ import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionInputStream; @@ -11,6 +12,7 @@ import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.SplitCompressionInputStream; import org.apache.hadoop.io.compress.SplittableCompressionCodec; +import org.seqdoop.hadoop_bam.util.WrapSeekable; /** * A Hadoop {@link CompressionCodec} for the @@ -66,7 +68,7 @@ public int read() throws IOException { } }; } - BGZFSplitGuesser splitGuesser = new BGZFSplitGuesser(seekableIn); + BGZFSplitGuesser splitGuesser = new BGZFSplitGuesser(new WrapSeekable((FSDataInputStream)seekableIn, end, null)); long adjustedStart = splitGuesser.guessNextBGZFBlockStart(start, end); ((Seekable)seekableIn).seek(adjustedStart); return new BGZFSplitCompressionInputStream(seekableIn, adjustedStart, end); diff --git a/src/main/java/org/seqdoop/hadoop_bam/util/BGZFSplitFileInputFormat.java b/src/main/java/org/seqdoop/hadoop_bam/util/BGZFSplitFileInputFormat.java index 09eedbb..044fea2 100644 --- a/src/main/java/org/seqdoop/hadoop_bam/util/BGZFSplitFileInputFormat.java +++ b/src/main/java/org/seqdoop/hadoop_bam/util/BGZFSplitFileInputFormat.java @@ -36,6 +36,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.seqdoop.hadoop_bam.util.WrapSeekable; + /** An {@link org.apache.hadoop.mapreduce.InputFormat} for BGZF-compressed * files. * @@ -130,9 +132,7 @@ private int addProbabilisticSplits( throws IOException { final Path path = ((FileSplit)splits.get(i)).getPath(); - final FSDataInputStream in = path.getFileSystem(cfg).open(path); - - final BGZFSplitGuesser guesser = new BGZFSplitGuesser(in); + final BGZFSplitGuesser guesser = new BGZFSplitGuesser(WrapSeekable.openPath(cfg, path)); FileSplit fspl; do { @@ -149,7 +149,6 @@ private int addProbabilisticSplits( ++i; } while (i < splits.size() && fspl.getPath().equals(path)); - in.close(); return i; } diff --git a/src/main/java/org/seqdoop/hadoop_bam/util/BGZFSplitGuesser.java b/src/main/java/org/seqdoop/hadoop_bam/util/BGZFSplitGuesser.java index 9835ff5..b944930 100644 --- a/src/main/java/org/seqdoop/hadoop_bam/util/BGZFSplitGuesser.java +++ b/src/main/java/org/seqdoop/hadoop_bam/util/BGZFSplitGuesser.java @@ -22,6 +22,7 @@ package org.seqdoop.hadoop_bam.util; +import htsjdk.samtools.seekablestream.SeekableStream; import htsjdk.samtools.seekablestream.ByteArraySeekableStream; import java.io.IOException; import java.io.InputStream; @@ -34,30 +35,14 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Seekable; -public class BGZFSplitGuesser { - private InputStream inFile; - private Seekable seekableInFile; - private ByteArraySeekableStream in; - private final ByteBuffer buf; +import org.seqdoop.hadoop_bam.BaseSplitGuesser; - private final static int BGZF_MAGIC = 0x04088b1f; - private final static int BGZF_MAGIC_SUB = 0x00024342; - private final static int BGZF_SUB_SIZE = 4 + 2; +public class BGZFSplitGuesser extends BaseSplitGuesser { - public BGZFSplitGuesser(InputStream is) { - inFile = is; - seekableInFile = (Seekable) is; - - buf = ByteBuffer.allocate(8); - buf.order(ByteOrder.LITTLE_ENDIAN); - } + private SeekableStream inFile; - public BGZFSplitGuesser(FSDataInputStream is) { + public BGZFSplitGuesser(SeekableStream is) { inFile = is; - seekableInFile = is; - - buf = ByteBuffer.allocate(8); - buf.order(ByteOrder.LITTLE_ENDIAN); } /// Looks in the range [beg,end). Returns end if no BAM record was found. @@ -69,17 +54,23 @@ public long guessNextBGZFBlockStart(long beg, long end) // the previous one, we need 0xfffe bytes for the start, and then 0xffff // for the block we're looking for. - byte[] arr = new byte[2*0xffff - 1]; - - this.seekableInFile.seek(beg); - int totalRead = 0; - for (int left = Math.min((int)(end - beg), arr.length); left > 0;) { - final int r = inFile.read(arr, totalRead, left); - if (r < 0) - break; - totalRead += r; - left -= r; - } + byte[] arr = new byte[0xffff]; + + this.inFile.seek(beg); + int bytesToRead = Math.min((int) (end - beg), arr.length); + int totalRead = 0; + // previously, this code did a single read and assumed that if it did not + // return an error code, then it had filled the array. however, when an + // incomplete copy occurs, the split picker will try to read past the end + // of the bucket, which will lead to the split picker returning an error + // code (-1), which gets mishandled elsewhere... + while(totalRead < bytesToRead) { + int read = inFile.read(arr, totalRead, bytesToRead - totalRead); + if (read == -1) { + return -1; // EOF + } + totalRead += read; + } arr = Arrays.copyOf(arr, totalRead); this.in = new ByteArraySeekableStream(arr); @@ -90,11 +81,13 @@ public long guessNextBGZFBlockStart(long beg, long end) final int firstBGZFEnd = Math.min((int)(end - beg), 0xffff); + PosSize p; for (int pos = 0;;) { - pos = guessNextBGZFPos(pos, firstBGZFEnd); - if (pos < 0) + p = guessNextBGZFPos(pos, firstBGZFEnd); + if (p == null) return end; - + pos = p.pos; + try { // Seek in order to trigger decompression of the block and a CRC // check. @@ -110,64 +103,4 @@ public long guessNextBGZFBlockStart(long beg, long end) return beg + pos; } } - - // Returns a negative number if it doesn't find anything. - private int guessNextBGZFPos(int p, int end) - throws IOException - { - for (;;) { - for (;;) { - in.seek(p); - in.read(buf.array(), 0, 4); - int n = buf.getInt(0); - - if (n == BGZF_MAGIC) - break; - - // Skip ahead a bit more than 1 byte if you can. - if (n >>> 8 == BGZF_MAGIC << 8 >>> 8) - ++p; - else if (n >>> 16 == BGZF_MAGIC << 16 >>> 16) - p += 2; - else - p += 3; - - if (p >= end) - return -1; - } - // Found what looks like a gzip block header: now get XLEN and - // search for the BGZF subfield. - final int p0 = p; - p += 10; - in.seek(p); - in.read(buf.array(), 0, 2); - p += 2; - final int xlen = getUShort(0); - final int subEnd = p + xlen; - - while (p < subEnd) { - in.read(buf.array(), 0, 4); - - if (buf.getInt(0) != BGZF_MAGIC_SUB) { - p += 4 + getUShort(2); - in.seek(p); - continue; - } - - // Found it: this is close enough to a BGZF block, make it - // our guess. - return p0; - } - // No luck: look for the next gzip block header. Start right after - // where we last saw the identifiers, although we could probably - // safely skip further ahead. (If we find the correct one right - // now, the previous block contained 0x1f8b0804 bytes of data: that - // seems... unlikely.) - p = p0 + 4; - } - } - - private int getUShort(final int idx) { - return (int)buf.getShort(idx) & 0xffff; - } } diff --git a/src/test/java/org/seqdoop/hadoop_bam/TestBGZFSplitGuesser.java b/src/test/java/org/seqdoop/hadoop_bam/TestBGZFSplitGuesser.java index d500cdd..1eda5bc 100644 --- a/src/test/java/org/seqdoop/hadoop_bam/TestBGZFSplitGuesser.java +++ b/src/test/java/org/seqdoop/hadoop_bam/TestBGZFSplitGuesser.java @@ -14,7 +14,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.seqdoop.hadoop_bam.util.BGZFSplitGuesser; - +import org.seqdoop.hadoop_bam.util.WrapSeekable; import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) @@ -42,7 +42,7 @@ public void test() throws IOException { Configuration conf = new Configuration(); Path path = new Path(file.toURI()); FSDataInputStream fsDataInputStream = path.getFileSystem(conf).open(path); - BGZFSplitGuesser bgzfSplitGuesser = new BGZFSplitGuesser(fsDataInputStream); + BGZFSplitGuesser bgzfSplitGuesser = new BGZFSplitGuesser(WrapSeekable.openPath(conf, path)); LinkedList boundaries = new LinkedList<>(); long start = 1; while (true) {