Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update BGZF block decode logic and refactor BGZFSplitGuesser on BaseSplitGuesser #179

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/main/java/org/seqdoop/hadoop_bam/BCFSplitGuesser.java
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ private long getUInt(final int idx) {
private short getUByte(final int idx) {
return (short)((short)buf.get(idx) & 0xff);
}
private short getUShort(final int idx) {
return (short)(buf.getShort(idx) & 0xffff);
}

public static void main(String[] args) throws IOException {
final GenericOptionsParser parser;
Expand Down
138 changes: 58 additions & 80 deletions src/main/java/org/seqdoop/hadoop_bam/BaseSplitGuesser.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
import java.nio.ByteOrder;
import org.apache.hadoop.io.IOUtils;

class BaseSplitGuesser {
public class BaseSplitGuesser {

protected final static int BGZF_MAGIC = 0x04088b1f;
protected final static int BGZF_MAGIC_SUB = 0x00024342;
protected final static int BGZF_SUB_SIZE = 4 + 2;
private final static int BGZF_MAGIC_0 = 0x1f;
private final static int BGZF_MAGIC_1 = 0x8b;
private final static int BGZF_MAGIC_2 = 0x08;
private final static int BGZF_MAGIC_3 = 0x04;
protected final static int BGZF_MAGIC = 0x04088b1f;
private final static int BGZF_MAGIC_SUB = 0x00024342;

protected SeekableStream in;
protected final ByteBuffer buf;
Expand All @@ -29,85 +32,60 @@ protected static class PosSize {
// Gives the compressed size on the side. Returns null if it doesn't find
// anything.
protected PosSize guessNextBGZFPos(int p, int end) {
try { for (;;) {
for (;;) {
in.seek(p);
IOUtils.readFully(in, buf.array(), 0, 4);
int n = buf.getInt(0);

if (n == BGZF_MAGIC)
break;

// Skip ahead a bit more than 1 byte if you can.
if (n >>> 8 == BGZF_MAGIC << 8 >>> 8)
++p;
else if (n >>> 16 == BGZF_MAGIC << 16 >>> 16)
p += 2;
else
p += 3;

if (p >= end)
return null;
}
// Found what looks like a gzip block header: now get XLEN and
// search for the BGZF subfield.
final int p0 = p;
p += 10;
in.seek(p);
IOUtils.readFully(in, buf.array(), 0, 2);
p += 2;
final int xlen = getUShort(0);
final int subEnd = p + xlen;

while (p < subEnd) {
IOUtils.readFully(in, buf.array(), 0, 4);

if (buf.getInt(0) != BGZF_MAGIC_SUB) {
p += 4 + getUShort(2);
in.seek(p);
continue;
}

// Found it: this is close enough to a BGZF block, make it
// our guess.

// But find out the size before returning. First, grab bsize:
// we'll need it later.
IOUtils.readFully(in, buf.array(), 0, 2);
int bsize = getUShort(0);

// Then skip the rest of the subfields.
p += BGZF_SUB_SIZE;
while (p < subEnd) {
try {
while(true) {
boolean found_block_start = false;
boolean in_magic = false;
in.seek(p);
IOUtils.readFully(in, buf.array(), 0, 4);
p += 4 + getUShort(2);
}
if (p != subEnd) {
// Cancel our guess because the xlen field didn't match the
// data.
break;
}

// Now skip past the compressed data and the CRC-32.
p += bsize - xlen - 19 + 4;
in.seek(p);
IOUtils.readFully(in, buf.array(), 0, 4);
return new PosSize(p0, buf.getInt(0));
while(!found_block_start) {
int n = in.read();

if (n == BGZF_MAGIC_0) {
in_magic = true;
} else if (n == BGZF_MAGIC_3 && in_magic) {
found_block_start = true;
} else if (p >= end) {
return null;
} else if (!((n == BGZF_MAGIC_1 && in_magic) ||
(n == BGZF_MAGIC_2 && in_magic))) {
in_magic = false;
}
p++;
}

// after the magic number:
// skip 6 unspecified bytes (MTIME, XFL, OS)
// XLEN = 6 (little endian, so 0x0600)
// SI1 = 0x42
// SI2 = 0x43
// SLEN = 0x02
in.seek(p + 6);
int n = in.read();
if (0x06 != n) {
continue;
}
n = in.read();
if (0x00 != n) {
continue;
}
n = in.read();
if (0x42 != n) {
continue;
}
n = in.read();
if (0x43 != n) {
continue;
}
n = in.read();
if (0x02 != n) {
continue;
}
int blockSize = (in.read() << 8) + in.read();

return new PosSize(p - 4, blockSize);
}
// No luck: look for the next gzip block header. Start right after
// where we last saw the identifiers, although we could probably
// safely skip further ahead. (If we find the correct one right
// now, the previous block contained 0x1f8b0804 bytes of data: that
// seems... unlikely.)
p = p0 + 4;

}} catch (IOException e) {
} catch (IOException e) {
return null;
}
}

protected int getUShort(final int idx) {
return (int)buf.getShort(idx) & 0xffff;
}
}
4 changes: 3 additions & 1 deletion src/main/java/org/seqdoop/hadoop_bam/util/BGZFCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
Expand All @@ -11,6 +12,7 @@
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.seqdoop.hadoop_bam.util.WrapSeekable;

/**
* A Hadoop {@link CompressionCodec} for the
Expand Down Expand Up @@ -56,7 +58,7 @@ public Compressor createCompressor() {
@Override
public SplitCompressionInputStream createInputStream(InputStream seekableIn,
Decompressor decompressor, long start, long end, READ_MODE readMode) throws IOException {
BGZFSplitGuesser splitGuesser = new BGZFSplitGuesser(seekableIn);
BGZFSplitGuesser splitGuesser = new BGZFSplitGuesser(new WrapSeekable((FSDataInputStream)seekableIn, end, null));
long adjustedStart = splitGuesser.guessNextBGZFBlockStart(start, end);
((Seekable)seekableIn).seek(adjustedStart);
return new BGZFSplitCompressionInputStream(seekableIn, adjustedStart, end);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.seqdoop.hadoop_bam.util.WrapSeekable;

/**
* A Hadoop {@link CompressionCodec} for the
Expand Down Expand Up @@ -66,7 +68,7 @@ public int read() throws IOException {
}
};
}
BGZFSplitGuesser splitGuesser = new BGZFSplitGuesser(seekableIn);
BGZFSplitGuesser splitGuesser = new BGZFSplitGuesser(new WrapSeekable((FSDataInputStream)seekableIn, end, null));
long adjustedStart = splitGuesser.guessNextBGZFBlockStart(start, end);
((Seekable)seekableIn).seek(adjustedStart);
return new BGZFSplitCompressionInputStream(seekableIn, adjustedStart, end);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.seqdoop.hadoop_bam.util.WrapSeekable;

/** An {@link org.apache.hadoop.mapreduce.InputFormat} for BGZF-compressed
* files.
*
Expand Down Expand Up @@ -130,9 +132,7 @@ private int addProbabilisticSplits(
throws IOException
{
final Path path = ((FileSplit)splits.get(i)).getPath();
final FSDataInputStream in = path.getFileSystem(cfg).open(path);

final BGZFSplitGuesser guesser = new BGZFSplitGuesser(in);
final BGZFSplitGuesser guesser = new BGZFSplitGuesser(WrapSeekable.openPath(cfg, path));

FileSplit fspl;
do {
Expand All @@ -149,7 +149,6 @@ private int addProbabilisticSplits(
++i;
} while (i < splits.size() && fspl.getPath().equals(path));

in.close();
return i;
}

Expand Down
Loading