Skip to content

Commit

Permalink
Add ability to read decimal columns (#79)
Browse files Browse the repository at this point in the history
Problem
=======
Often parquet files have a column of type `decimal`. Currently `decimal`
column types are not supported for reading.

Solution
========
I implemented the required code to allow properly reading(only) of
decimal columns without any external libraries.

Change summary:
---------------
* I made a lot of commits as this required some serious trial and error
* modified `lib/codec/types.ts` to allow precision and scale properties
on the `Options` interface for use when decoding column data
* modified `lib/declare.ts` to allow `Decimal` in `OriginalType`, also
modified `FieldDefinition` and `ParquetField` to include precision and
scale.
* In `plain.ts` I modified the `decodeValues_INT32` and
`decodeValues_INT64` to take options so I can determine the column type
and if `DECIMAL`, call the `decodeValues_DECIMAL` function which uses
the options object's precision and scale configured to decode the column
* modified `lib/reader.ts` to set the `originalType`, `precision`,
`scale` and name while in `decodePage` as well as `precision` and
`scale` in `decodeSchema` to retrieve that data from the parquet file to
be used while decoding data for a Decimal column
* modified `lib/schema.ts` to indicate what is required from a parquet
file for a decimal column in order to process it properly, as well as
passing along the `precision` and `scale` if those options exist on a
column
* adding `DECIMAL` configuration to `PARQUET_LOGICAL_TYPES`
* updating `test/decodeSchema.js` to set precision and scale to null as
they are now set to for non decimal types
* added some Decimal specific tests in `test/reader.js` and
`test/schema.js`

Steps to Verify:
----------------
1. Take this code, and paste it into a file at the root of the repo with
the `.js` extenstion:
```
const parquet = require('./dist/parquet')

async function main () {
    const file = './test/test-files/valid-decimal-columns.parquet'
    await _readParquetFile(file)
}

async function _readParquetFile (filePath) {
    const reader = await parquet.ParquetReader.openFile(filePath)
    console.log(reader.schema)
    let cursor = reader.getCursor()
    const columnListFromFile = []
    cursor.schema.fieldList.forEach((rec, i) => {
        columnListFromFile.push({
            name: rec.name,
            type: rec.originalType
        })
    })

    let record = null
    let count = 0
    const previewData = []
    const columnsToRead = columnListFromFile.map(col => col.name)
    cursor = reader.getCursor(columnsToRead)
    console.log('-------------------- data --------------------')
    while (record = await cursor.next()) {
        previewData.push(record)
        console.log(`Row: ${count}`)
        console.log(record)
        count++
    }
    await reader.close()
}

main()
    .catch(error => {
        console.error(error)
        process.exit(1)
    })

```
2. run the code in a terminal using `node <your file name>.js`
3. Verify that the schema indicates 4 columns, including `over_9_digits`
with scale: 7, and precision 10. As well as a column `under_9_digits`
with scale: 4, precision: 6.
4. The values of those columns should match this table:
![Screenshot 2023-04-22 at 16 53
33](https://user-images.githubusercontent.com/2294003/233810916-3d1a37da-ef22-4e1c-8e46-9961d7470e5e.png)
  • Loading branch information
dgaudet authored Apr 26, 2023
1 parent a011a2e commit 2c733b5
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 23 deletions.
76 changes: 66 additions & 10 deletions lib/codec/plain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,21 @@ function encodeValues_INT32(values: Array<number>) {
return buf;
}

function decodeValues_INT32(cursor: Cursor, count: number) {
function decodeValues_INT32(cursor: Cursor, count: number, opts: Options) {
let values = [];

for (let i = 0; i < count; ++i) {
values.push(cursor.buffer.readInt32LE(cursor.offset));
cursor.offset += 4;
const name = opts.name || opts.column?.name || undefined;
try {
if (opts.originalType === 'DECIMAL') {
values = decodeValues_DECIMAL(cursor, count, opts);
} else {
for (let i = 0; i < count; ++i) {
values.push(cursor.buffer.readInt32LE(cursor.offset));
cursor.offset += 4;
}
}
} catch (e) {
console.log(`Error thrown for column: ${name}`);
throw e;
}

return values;
Expand All @@ -55,12 +64,59 @@ function encodeValues_INT64(values: Array<number>) {
return buf;
}

function decodeValues_INT64(cursor: Cursor, count: number) {
function decodeValues_INT64(cursor: Cursor, count: number, opts: Options) {
let values = [];
const name = opts.name || opts.column?.name || undefined;
try {
if (opts.originalType === 'DECIMAL' || opts.column?.originalType === 'DECIMAL') {
let columnOptions: any = opts.column?.originalType ? opts.column : opts;
values = decodeValues_DECIMAL(cursor, count, columnOptions);
} else {
for (let i = 0; i < count; ++i) {
values.push(cursor.buffer.readBigInt64LE(cursor.offset));
cursor.offset += 8;
}
}
} catch (e) {
console.log(`Error thrown for column: ${name}`);
throw e;
}

return values;
}

function decodeValues_DECIMAL(cursor: Cursor, count: number, opts: Options) {
let {
scale,
precision
} = opts;

const name = opts.name || undefined
if (!scale) {
throw `missing option: scale (required for DECIMAL) for column: ${name}`;
}
if (!precision) {
throw `missing option: precision (required for DECIMAL) for column: ${name}`;
}

let values = [];

// by default we prepare the offset and bufferFunction to work with 32bit integers
let offset = 4;
let bufferFunction: any = (offset: number) => cursor.buffer.readInt32LE(offset);
if (precision > 9) {
// if the precision is over 9 digits, then we are dealing with a 64bit integer
offset = 8;
bufferFunction = (offset: number) => cursor.buffer.readBigInt64LE(offset);
}
for (let i = 0; i < count; ++i) {
values.push(cursor.buffer.readBigInt64LE(cursor.offset));
cursor.offset += 8;
const bufferSize = cursor.size || 0
if (bufferSize === 0 || cursor.offset < bufferSize) {
const fullValue = bufferFunction(cursor.offset);
const valueWithDecimalApplied = Number(fullValue) / Math.pow(10, scale);
values.push(valueWithDecimalApplied);
cursor.offset += offset;
}
}

return values;
Expand Down Expand Up @@ -266,10 +322,10 @@ export const decodeValues = function (
return decodeValues_BOOLEAN(cursor, count);

case "INT32":
return decodeValues_INT32(cursor, count);
return decodeValues_INT32(cursor, count, opts);

case "INT64":
return decodeValues_INT64(cursor, count);
return decodeValues_INT64(cursor, count, opts);

case "INT96":
return decodeValues_INT96(cursor, count);
Expand Down
3 changes: 3 additions & 0 deletions lib/codec/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ export interface Options {
rLevelMax?: number,
dLevelMax?: number,
type?: string,
name?: string,
precision?: number,
scale?: number
}

export interface Cursor {
Expand Down
6 changes: 5 additions & 1 deletion lib/declare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export type OriginalType =
// | 'MAP_KEY_VALUE' // 2
| 'LIST' // 3
| 'ENUM' // 4
// | 'DECIMAL' // 5
| 'DECIMAL' // 5
| 'DATE' // 6
| 'TIME_MILLIS' // 7
| 'TIME_MICROS' // 8
Expand Down Expand Up @@ -62,6 +62,8 @@ export interface FieldDefinition {
statistics?: Statistics | false;
parent?: ParentField
num_children?: NumChildrenField
precision?: number
scale?: number
}

export interface ParquetField {
Expand All @@ -74,6 +76,8 @@ export interface ParquetField {
typeLength?: number;
encoding?: ParquetCodec;
compression?: ParquetCompression;
precision?: number;
scale?: number;
rLevelMax: number;
dLevelMax: number;
isNested?: boolean;
Expand Down
10 changes: 8 additions & 2 deletions lib/reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,11 @@ async function decodeDataPage(cursor: Cursor, header: parquet_thrift.PageHeader,
{
typeLength: opts.column!.typeLength!,
bitWidth: opts.column!.typeLength!,
disableEnvelope: opts.column!.disableEnvelope
disableEnvelope: opts.column!.disableEnvelope,
originalType: opts.column!.originalType,
precision: opts.column!.precision,
scale: opts.column!.scale,
name: opts.column!.name
});

cursor.offset = cursorEnd;
Expand Down Expand Up @@ -1084,7 +1088,9 @@ function decodeSchema(schemaElements: Array<parquet_thrift.SchemaElement>) {
type: logicalType as ParquetType,
typeLength: schemaElement.type_length,
optional: optional,
repeated: repeated
repeated: repeated,
scale: schemaElement.scale,
precision: schemaElement.precision
};
}

Expand Down
36 changes: 31 additions & 5 deletions lib/schema.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as parquet_codec from './codec';
import * as parquet_compression from './compression'
import * as parquet_types from './types'
import { SchemaDefinition, ParquetField, RepetitionType } from './declare'
import { SchemaDefinition, ParquetField, RepetitionType, FieldDefinition } from './declare'

const PARQUET_COLUMN_KEY_SEPARATOR = '.';

Expand Down Expand Up @@ -119,10 +119,10 @@ function buildFields(schema: SchemaDefinition, rLevelParentMax?: number, dLevelP
statistics: opts.statistics,
fieldCount: Object.keys(opts.fields).length,
fields: buildFields(
opts.fields,
rLevelMax,
dLevelMax,
path.concat(name))
opts.fields,
rLevelMax,
dLevelMax,
path.concat(name))
};

if (opts.type == 'LIST' || opts.type == 'MAP') fieldList[name].originalType = opts.type;
Expand Down Expand Up @@ -158,6 +158,10 @@ function buildFields(schema: SchemaDefinition, rLevelParentMax?: number, dLevelP
fieldErrors.push(`Unsupported compression method: ${opts.compression}, for Column: ${nameWithPath}`);
}

if (typeDef.originalType === 'DECIMAL') {
fieldErrors = fieldErrors.concat(errorsForDecimalOpts(typeDef.originalType, opts, nameWithPath));
}

/* add to schema */
fieldList[name] = {
name: name,
Expand All @@ -168,6 +172,8 @@ function buildFields(schema: SchemaDefinition, rLevelParentMax?: number, dLevelP
encoding: opts.encoding,
statistics: opts.statistics,
compression: opts.compression,
precision: opts.precision,
scale: opts.scale,
typeLength: opts.typeLength || typeDef.typeLength,
rLevelMax: rLevelMax,
dLevelMax: dLevelMax
Expand Down Expand Up @@ -199,3 +205,23 @@ function listFields(fields: Record<string, ParquetField>) {
function isDefined<T>(val: T | undefined): val is T {
return val !== undefined;
}

function errorsForDecimalOpts(type: string, opts: FieldDefinition, columnName: string): string[] {
const fieldErrors = []
if(!opts.precision) {
fieldErrors.push(
`invalid schema for type: ${type}, for Column: ${columnName}, precision is required`
);
}
else if (opts.precision > 18) {
fieldErrors.push(
`invalid precision for type: ${type}, for Column: ${columnName}, can not handle precision over 18`
);
}
if (!opts.scale) {
fieldErrors.push(
`invalid schema for type: ${type}, for Column: ${columnName}, scale is required`
);
}
return fieldErrors
}
5 changes: 5 additions & 0 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ export const PARQUET_LOGICAL_TYPES: ParquetTypeData = {
originalType: 'INT_64',
toPrimitive: toPrimitive_INT64
},
'DECIMAL': {
primitiveType: 'INT64',
originalType: 'DECIMAL',
toPrimitive: toPrimitive_INT64
},
'JSON': {
primitiveType: 'BYTE_ARRAY',
originalType: 'JSON',
Expand Down
16 changes: 12 additions & 4 deletions test/decodeSchema.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ describe('ParquetSchema', function() {
"encoding": "PLAIN",
"compression": "UNCOMPRESSED",
"rLevelMax": 0,
"dLevelMax": 0
"dLevelMax": 0,
"precision": null,
"scale": null
}
}
},
Expand Down Expand Up @@ -191,7 +193,9 @@ describe('ParquetSchema', function() {
"encoding": "PLAIN",
"compression": "UNCOMPRESSED",
"rLevelMax": 0,
"dLevelMax": 0
"dLevelMax": 0,
"precision": null,
"scale": null
},
"g": {
"name": "g",
Expand All @@ -209,7 +213,9 @@ describe('ParquetSchema', function() {
"encoding": "PLAIN",
"compression": "UNCOMPRESSED",
"rLevelMax": 0,
"dLevelMax": 0
"dLevelMax": 0,
"precision": null,
"scale": null
}
}
}
Expand All @@ -229,7 +235,9 @@ describe('ParquetSchema', function() {
"encoding": "PLAIN",
"compression": "UNCOMPRESSED",
"rLevelMax": 0,
"dLevelMax": 0
"dLevelMax": 0,
"precision": null,
"scale": null
}
}
}
Expand Down
22 changes: 21 additions & 1 deletion test/reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,25 @@ describe("ParquetReader", () => {

assert.equal(counter, 40000);
})
})
});

describe("#handleDecimal", () => {
it("loads parquet with columns configured as DECIMAL", async () => {
const reader = await parquet.ParquetReader.openFile(
path.join(__dirname,'test-files','valid-decimal-columns.parquet')
);

const data = []
for await(const record of reader) {
data.push(record)
}

assert.equal(data.length, 4);
assert.equal(data[0].over_9_digits, 118.0297106);
assert.equal(data[1].under_9_digits, 18.7106);
// handling null values
assert.equal(data[2].over_9_digits, undefined);
assert.equal(data[2].under_9_digits, undefined);
})
});
});
24 changes: 24 additions & 0 deletions test/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -524,4 +524,28 @@ describe('ParquetSchema', function() {
}, 'Unsupported compression method: UNKNOWN, for Column: quantity');
});

it('should throw error given decimal with no precision', function() {
assert.throws(() => {
new parquet.ParquetSchema({
test_decimal_col: {type: 'DECIMAL', scale: 4},
})
}, 'invalid schema for type: DECIMAL, for Column: test_decimal_col, precision is required');
});

it('should throw error given decimal with no scale', function() {
assert.throws(() => {
new parquet.ParquetSchema({
test_decimal_col: {type: 'DECIMAL', precision: 4},
})
}, 'invalid schema for type: DECIMAL, for Column: test_decimal_col, scale is required');
});

it('should throw error given decimal with over 18 precision', function() {
assert.throws(() => {
new parquet.ParquetSchema({
decimal_column: {type: 'DECIMAL', precision: 19, scale: 5},
})
}, 'invalid precision for type: DECIMAL, for Column: decimal_column, can not handle precision over 18');
});

});
Binary file added test/test-files/valid-decimal-columns.parquet
Binary file not shown.

0 comments on commit 2c733b5

Please sign in to comment.