diff --git a/src/bai.ts b/src/bai.ts index bb925c2..7855158 100644 --- a/src/bai.ts +++ b/src/bai.ts @@ -2,7 +2,7 @@ import Long from 'long' import VirtualOffset, { fromBytes } from './virtualOffset' import Chunk from './chunk' -import IndexFile from './indexFile' +import IndexFile, { Props } from './indexFile' import { longToNumber, abortBreakPoint, canMergeBlocks } from './util' const BAI_MAGIC = 21578050 // BAI\1 @@ -20,8 +20,8 @@ export default class BAI extends IndexFile { return { lineCount } } - async lineCount(refId: number) { - const index = (await this.parse()).indices[refId] + async lineCount(refId: number, props: { signal?: AbortSignal } = {}) { + const index = (await this.parse(props)).indices[refId] if (!index) { return -1 } @@ -30,11 +30,18 @@ export default class BAI extends IndexFile { } // fetch and parse the index - async _parse(abortSignal?: AbortSignal) { + async _parse(props: { signal?: AbortSignal; statusCallback?: Function } = {}) { + const { signal, statusCallback } = props const data: { [key: string]: any } = { bai: true, maxBlockSize: 1 << 16 } - const bytes = (await this.filehandle.readFile({ - signal: abortSignal, - })) as Buffer + if (statusCallback) { + statusCallback('Downloading index') + } + + const bytes = (await this.filehandle.readFile(props)) as Buffer + + if (statusCallback) { + statusCallback('Parsing index') + } // check BAI magic numbers if (bytes.readUInt32LE(0) !== BAI_MAGIC) { @@ -49,7 +56,7 @@ export default class BAI extends IndexFile { data.indices = new Array(data.refCount) let currOffset = 8 for (let i = 0; i < data.refCount; i += 1) { - await abortBreakPoint(abortSignal) + await abortBreakPoint(signal) // the binning index const binCount = bytes.readInt32LE(currOffset) @@ -100,13 +107,16 @@ export default class BAI extends IndexFile { } async indexCov( - seqId: number, - start?: number, - end?: number, + { seqId, start, end }: { seqId: number; start?: number; end?: number }, + props: Props, ): Promise<{ start: number; end: number; score: number }[]> { + if (seqId === undefined) { + throw new Error('No seqId specified') + } + const v = 16384 const range = start !== undefined - const indexData = await this.parse() + const indexData = await this.parse(props) const seqIdx = indexData.indices[seqId] if (!seqIdx) { return [] @@ -166,7 +176,7 @@ export default class BAI extends IndexFile { return list } - async blocksForRange(refId: number, min: number, max: number) { + async blocksForRange(refId: number, min: number, max: number, props: { signal?: AbortSignal } = {}) { if (min < 0) { min = 0 } diff --git a/src/bamFile.ts b/src/bamFile.ts index 4ea0a66..8ab7fad 100644 --- a/src/bamFile.ts +++ b/src/bamFile.ts @@ -9,20 +9,21 @@ import entries from 'object.entries-ponyfill' import LRU from 'quick-lru' import { LocalFile, RemoteFile, GenericFilehandle } from 'generic-filehandle' import BAMFeature from './record' -import IndexFile from './indexFile' +import IndexFile, { Props } from './indexFile' import { parseHeaderText } from './sam' import { abortBreakPoint, checkAbortSignal, timeout } from './util' +// BAI\0 encoded as integer const BAM_MAGIC = 21840194 const blockLen = 1 << 16 -type G = GenericFilehandle interface BamOpts { viewAsPairs?: boolean pairAcrossChr?: boolean maxInsertSize?: number signal?: AbortSignal + statusCallback?: Function } export default class BamFile { @@ -112,14 +113,12 @@ export default class BamFile { this.chunkSizeLimit = chunkSizeLimit || 300000000 // 300MB } - async getHeader(abortSignal?: AbortSignal) { - const indexData = await this.index.parse(abortSignal) + async getHeader(props: { signal?: AbortSignal; successCallback: Function }) { + const indexData = await this.index.parse(props) const ret = indexData.firstDataLine ? indexData.firstDataLine.blockPosition + 65535 : undefined let buffer if (ret) { - const res = await this.bam.read(Buffer.alloc(ret + blockLen), 0, ret + blockLen, 0, { - signal: abortSignal, - }) + const res = await this.bam.read(Buffer.alloc(ret + blockLen), 0, ret + blockLen, 0, props) const { bytesRead } = res ;({ buffer } = res) if (!bytesRead) { @@ -131,7 +130,7 @@ export default class BamFile { buffer = buffer.slice(0, ret) } } else { - buffer = (await this.bam.readFile({ signal: abortSignal })) as Buffer + buffer = (await this.bam.readFile(props)) as Buffer } const uncba = await unzip(buffer) @@ -142,7 +141,7 @@ export default class BamFile { const headLen = uncba.readInt32LE(4) this.header = uncba.toString('utf8', 8, 8 + headLen) - const { chrToIndex, indexToChr } = await this._readRefSeqs(headLen + 8, 65535, abortSignal) + const { chrToIndex, indexToChr } = await this._readRefSeqs(headLen + 8, 65535, props) this.chrToIndex = chrToIndex this.indexToChr = indexToChr @@ -154,17 +153,15 @@ export default class BamFile { async _readRefSeqs( start: number, refSeqBytes: number, - abortSignal?: AbortSignal, + props: Props = {}, ): Promise<{ chrToIndex: { [key: string]: number } indexToChr: { refName: string; length: number }[] }> { if (start > refSeqBytes) { - return this._readRefSeqs(start, refSeqBytes * 2) + return this._readRefSeqs(start, refSeqBytes * 2, props) } - const res = await this.bam.read(Buffer.alloc(refSeqBytes + blockLen), 0, refSeqBytes, 0, { - signal: abortSignal, - }) + const res = await this.bam.read(Buffer.alloc(refSeqBytes + blockLen), 0, refSeqBytes, 0, props) const { bytesRead } = res let { buffer } = res if (!bytesRead) { @@ -253,13 +250,13 @@ export default class BamFile { yield* this._fetchChunkFeatures(chunks, chrId, min, max, opts) } - async *_fetchChunkFeatures(chunks: Chunk[], chrId: number, min: number, max: number, opts: BamOpts) { + async *_fetchChunkFeatures(chunks: Chunk[], chrId: number, min: number, max: number, props: BamOpts) { const featPromises = [] let done = false for (let i = 0; i < chunks.length; i++) { const c = chunks[i] - const { data, cpositions, dpositions, chunk } = await this.featureCache.get(c.toString(), c, opts.signal) + const { data, cpositions, dpositions, chunk } = await this.featureCache.get(c.toString(), c, props) const promise = this.readBamFeatures(data, cpositions, dpositions, chunk).then(records => { const recs = [] for (let i = 0; i < records.length; i += 1) { @@ -284,14 +281,14 @@ export default class BamFile { } } - checkAbortSignal(opts.signal) + checkAbortSignal(props.signal) for (let i = 0; i < featPromises.length; i++) { yield featPromises[i] } - checkAbortSignal(opts.signal) - if (opts.viewAsPairs) { - yield this.fetchPairs(chrId, featPromises, opts) + checkAbortSignal(props.signal) + if (props.viewAsPairs) { + yield this.fetchPairs(chrId, featPromises, props) } } @@ -355,7 +352,7 @@ export default class BamFile { } const mateFeatPromises = mateChunks.map(async c => { const { data, cpositions, dpositions, chunk } = await this.featureCache.get(c.toString(), c, opts.signal) - const feats = await this.readBamFeatures(data, cpositions, dpositions, chunk) + const feats = await this.readBamFeatures(data, cpositions, dpositions, chunk, opts) const mateRecs = [] for (let i = 0; i < feats.length; i += 1) { const feature = feats[i] @@ -396,7 +393,7 @@ export default class BamFile { return { data, cpositions, dpositions, chunk } } - async readBamFeatures(ba: Buffer, cpositions: number[], dpositions: number[], chunk: Chunk) { + async readBamFeatures(ba: Buffer, cpositions: number[], dpositions: number[], chunk: Chunk, props: Props) { let blockStart = 0 const sink = [] let pos = 0 @@ -436,6 +433,10 @@ export default class BamFile { featsSinceLastTimeout++ if (featsSinceLastTimeout > 500) { await timeout(1) + await checkAbortSignal(props.signal) + if (props.statusCallback) { + props.statusCallback(3, 500) + } featsSinceLastTimeout = 0 } } @@ -445,19 +446,19 @@ export default class BamFile { return sink } - async hasRefSeq(seqName: string) { + async hasRefSeq(seqName: string, props: Props = {}) { const refId = this.chrToIndex && this.chrToIndex[seqName] - return this.index.hasRefSeq(refId) + return this.index.hasRefSeq(refId, props) } - async lineCount(seqName: string) { + async lineCount(seqName: string, props: Props = {}) { const refId = this.chrToIndex && this.chrToIndex[seqName] - return this.index.lineCount(refId) + return this.index.lineCount(refId, props) } - async indexCov(seqName: string, start?: number, end?: number) { - await this.index.parse() + async indexCov({ seqName, start, end }: { seqName: string; start?: number; end?: number }, props: Props = {}) { + await this.index.parse(props) const seqId = this.chrToIndex && this.chrToIndex[seqName] - return this.index.indexCov(seqId, start, end) + return this.index.indexCov({ seqId, start, end }, props) } } diff --git a/src/csi.ts b/src/csi.ts index cd6f0a2..246a13b 100644 --- a/src/csi.ts +++ b/src/csi.ts @@ -5,7 +5,7 @@ import { fromBytes } from './virtualOffset' import Chunk from './chunk' import { longToNumber, abortBreakPoint } from './util' -import IndexFile from './indexFile' +import IndexFile, { Props } from './indexFile' const CSI1_MAGIC = 21582659 // CSI\1 const CSI2_MAGIC = 38359875 // CSI\2 @@ -27,8 +27,8 @@ export default class CSI extends IndexFile { this.depth = 0 this.minShift = 0 } - async lineCount(refId: number): Promise { - const indexData = await this.parse() + async lineCount(refId: number, opts: { signal?: AbortSignal } = {}): Promise { + const indexData = await this.parse(opts) if (!indexData) { return -1 } @@ -94,9 +94,18 @@ export default class CSI extends IndexFile { } // fetch and parse the index - async _parse(abortSignal?: AbortSignal) { + async _parse(props: Props = {}) { const data: { [key: string]: any } = { csi: true, maxBlockSize: 1 << 16 } - const bytes = await unzip((await this.filehandle.readFile({ signal: abortSignal })) as Buffer) + const { signal, statusCallback } = props + + if (statusCallback) { + statusCallback(1, 'Downloading file') + } + const bytes = await unzip((await this.filehandle.readFile(props)) as Buffer) + + if (statusCallback) { + statusCallback(1, 'Parsing index') + } // check TBI magic numbers if (bytes.readUInt32LE(0) === CSI1_MAGIC) { @@ -121,7 +130,7 @@ export default class CSI extends IndexFile { data.indices = new Array(data.refCount) let currOffset = 16 + auxLength + 4 for (let i = 0; i < data.refCount; i += 1) { - await abortBreakPoint(abortSignal) + await abortBreakPoint(signal) // the binning index const binCount = bytes.readInt32LE(currOffset) currOffset += 4 @@ -153,6 +162,9 @@ export default class CSI extends IndexFile { data.indices[i] = { binIndex, stats } } + if (statusCallback) { + statusCallback(1, 'Done parsing index') + } return data } @@ -167,12 +179,12 @@ export default class CSI extends IndexFile { return { lineCount } } - async blocksForRange(refId: number, beg: number, end: number, opts: Record = {}): Promise { + async blocksForRange(refId: number, beg: number, end: number, opts: { signal?: AbortSignal } = {}): Promise { if (beg < 0) { beg = 0 } - const indexData = await this.parse(opts.signal) + const indexData = await this.parse(opts) if (!indexData) { return [] } diff --git a/src/indexFile.ts b/src/indexFile.ts index 7b8866c..5815084 100644 --- a/src/indexFile.ts +++ b/src/indexFile.ts @@ -4,6 +4,10 @@ import { GenericFilehandle } from 'generic-filehandle' import VirtualOffset from './virtualOffset' import Chunk from './chunk' +export interface Props { + signal?: AbortSignal + statusCallback?: Function +} export default abstract class IndexFile { public filehandle: GenericFilehandle public renameRefSeq: Function @@ -22,20 +26,23 @@ export default abstract class IndexFile { }) { this.filehandle = filehandle this.renameRefSeq = renameRefSeq + this._parseCache = new AbortablePromiseCache({ + cache: new QuickLRU({ maxSize: 1 }), + fill: (data: Props) => { + return this._parse(data) + }, + }) } - public abstract async lineCount(refId: number): Promise - protected abstract async _parse(signal?: AbortSignal): Promise + public abstract async lineCount(refId: number, props: Props): Promise + + protected abstract async _parse(props: Props): Promise + public abstract async indexCov( - refId: number, - start?: number, - end?: number, + query: { seqId: number; start?: number; end?: number }, + props: Props, ): Promise<{ start: number; end: number; score: number }[]> - public abstract async blocksForRange( - chrId: number, - start: number, - end: number, - opts: Record, - ): Promise + + public abstract async blocksForRange(chrId: number, start: number, end: number, opts: Props): Promise _findFirstData(data: any, virtualOffset: VirtualOffset) { const currentFdl = data.firstDataLine @@ -46,23 +53,17 @@ export default abstract class IndexFile { } } - async parse(abortSignal?: AbortSignal) { - if (!this._parseCache) { - this._parseCache = new AbortablePromiseCache({ - cache: new QuickLRU({ maxSize: 1 }), - fill: (data: any, signal: AbortSignal) => this._parse(signal), - }) - } - return this._parseCache.get('index', null, abortSignal) + parse(props: Props = {}) { + return this._parseCache.get('index', props, props.signal) } /** * @param {number} seqId - * @param {AbortSignal} [abortSignal] + * @param {props} signal/statusCallback * @returns {Promise} true if the index contains entries for * the given reference sequence ID, false otherwise */ - async hasRefSeq(seqId: number, abortSignal?: AbortSignal) { - return !!((await this.parse(abortSignal)).indices[seqId] || {}).binIndex + async hasRefSeq(seqId: number, props: Props) { + return !!((await this.parse(props)).indices[seqId] || {}).binIndex } } diff --git a/test/bai.test.js b/test/bai.test.js index d3a5abc..ab2967e 100644 --- a/test/bai.test.js +++ b/test/bai.test.js @@ -52,7 +52,7 @@ describe('index human data', () => { ), }) const aborter = new HalfAbortController() - const indexDataP = ti.parse(aborter.signal) + const indexDataP = ti.parse({ signal: aborter.signal }) aborter.abort() await expect(indexDataP).rejects.toThrow(/aborted/) }) @@ -66,7 +66,7 @@ describe('bam header', () => { expect(ti.header).toEqual('@SQ SN:ctgA LN:50001\n') expect(ti.chrToIndex.ctgA).toEqual(0) expect(ti.indexToChr[0]).toEqual({ refName: 'ctgA', length: 50001 }) - const ret = await ti.indexCov('ctgA') + const ret = await ti.indexCov({ seqName: 'ctgA' }) expect(ret).toMatchSnapshot() }) it('loads volvox-sorted.bam with csi index', async () => { @@ -105,7 +105,11 @@ describe('bam records', () => { ) }) it('gets features from the end of volvox-sorted.bam', async () => { - const records = await ti.getRecordsForRange('ctgA', 47457, 50001) + const records = await ti.getRecordsForRange('ctgA', 47457, 50001, { + statusCallback: props => { + console.log(props) + }, + }) expect(records.length).toEqual(473) }) it('gets out of bounds from volvox-sorted.bam', async () => { @@ -375,9 +379,9 @@ describe('large indexcov', () => { const ti = new BAI({ filehandle: new LocalFile(require.resolve('./data/HG00096_illumina_lowcov.bam.bai')), }) - const ret = await ti.indexCov(10, 0, 1000000) + const ret = await ti.indexCov({ seqId: 10, start: 0, end: 1000000 }) expect(ret).toMatchSnapshot() - const empty = await ti.indexCov(0) + const empty = await ti.indexCov({ seqId: 0 }) expect(empty).toEqual([]) }) }) diff --git a/test/csi.test.js b/test/csi.test.js index be0bf96..49254bf 100644 --- a/test/csi.test.js +++ b/test/csi.test.js @@ -22,7 +22,7 @@ describe('bam header', () => { expect(ti.header).toEqual('@SQ SN:ctgA LN:50001\n') expect(ti.chrToIndex.ctgA).toEqual(0) expect(ti.indexToChr[0]).toEqual({ refName: 'ctgA', length: 50001 }) - const ret = await ti.indexCov('ctgA') + const ret = await ti.indexCov({ seqName: 'ctgA' }) expect(ret).toMatchSnapshot() }) it('loads volvox-sorted.bam with csi index', async () => {