Skip to content

Commit

Permalink
feat(fetch): add browser/node http client (#15)
Browse files Browse the repository at this point in the history
Add browser/node support for fetching http request.
  • Loading branch information
enddynayn authored Jun 18, 2021
1 parent a4f425a commit ba60652
Show file tree
Hide file tree
Showing 6 changed files with 1,784 additions and 33 deletions.
45 changes: 20 additions & 25 deletions lib/reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const parquet_types = require('./types');
const BufferReader = require('./bufferReader');
const bloomFilterReader = require('./bloomFilterIO/bloomFilterReader');
const groupBy = require("lodash/groupBy");
const fetch = require('cross-fetch');

const {
getBloomFiltersFor,
Expand Down Expand Up @@ -125,8 +126,8 @@ class ParquetReader {
* a `url` property.
* This function returns a new parquet reader
*/
static async openUrl(request, params, options) {
let envelopeReader = await ParquetEnvelopeReader.openUrl(request, params, options);
static async openUrl(params, options) {
let envelopeReader = await ParquetEnvelopeReader.openUrl(params, options);
return this.openEnvelopeReader(envelopeReader, options);
}

Expand All @@ -136,7 +137,9 @@ class ParquetReader {
}
try {
await envelopeReader.readHeader();

let metadata = await envelopeReader.readFooter();

return new ParquetReader(metadata, envelopeReader, opts);
} catch (err) {
await envelopeReader.close();
Expand Down Expand Up @@ -373,7 +376,7 @@ class ParquetEnvelopeReader {
return new ParquetEnvelopeReader(readFn, closeFn, fileStat, options);
}

static async openUrl(request, params, options) {
static async openUrl(params, options) {
if (typeof params === 'string')
params = {url: params};
if (!params.url)
Expand All @@ -382,34 +385,23 @@ class ParquetEnvelopeReader {
let base = params.url.split('/');
base = base.slice(0, base.length-1).join('/')+'/';

params.encoding = params.encoding || null;

let defaultHeaders = params.headers || {};

let filesize = async () => new Promise( (resolve, reject) => {
let req = request(params);
req.on('response', res => {
req.abort();
resolve(res.headers['content-length']);
});
req.on('error', reject);
});
let filesize = async () => {

const { headers } = await fetch(params.url);
return headers.get('Content-Length');
};

let readFn = (offset, length, file) => {
let readFn = async (offset, length, file) => {
let url = file ? base+file : params.url;

let range = `bytes=${offset}-${offset+length-1}`;
let headers = Object.assign({}, defaultHeaders, {range});
let req = Object.assign({}, params, {headers, url});
return new Promise( (resolve, reject) => {
request(req, (err, res) => {
if (err) {
reject(err);
} else {
resolve(res.body);
}
});
});
const response = await fetch(url, { headers });
const arrayBuffer = await response.arrayBuffer();
const buffer = Buffer.from(arrayBuffer);

return buffer;
};

let closeFn = () => ({});
Expand All @@ -436,6 +428,7 @@ class ParquetEnvelopeReader {

readHeader() {
return this.read(0, PARQUET_MAGIC.length).then(buf => {

if (buf.toString() != PARQUET_MAGIC) {
throw 'not valid parquet file'
}
Expand Down Expand Up @@ -623,7 +616,9 @@ class ParquetEnvelopeReader {
if (typeof this.fileSize === 'function') {
this.fileSize = await this.fileSize();
}

let trailerLen = PARQUET_MAGIC.length + 4;

let trailerBuf = await this.read(this.fileSize - trailerLen, trailerLen);

if (trailerBuf.slice(4).toString() != PARQUET_MAGIC) {
Expand Down
Loading

0 comments on commit ba60652

Please sign in to comment.