-
Notifications
You must be signed in to change notification settings - Fork 339
SPL practice:data flow during speeding up batch job
Speeding up batch jobs is one of the major optimization scenarios of SPL, and storing the data of batch job into SPL’s high-performance file is an important step in the optimization process. The data that needs to be dumped usually involves two parts: historical cold data and periodic incremental data (added, deleted or modified data). This article will present how to dump and calculate these two parts of data, as well as how to perform periodic update and regular reorganization.
Composite table is a high-performance storage format provided by SPL; its principle is to sort the data in advance and then store the data compactly in a compressed manner. The advantage of composite table is that it occupies less space and allows us to quickly locate data record by means of ordered data characteristics.
Let’s take the ORDERS of TPC-H as an example. The code to dump the historical data from database to composite table is as follows:
A | B | |
---|---|---|
1 | fork 4.() | =connect@l("oracle12c") |
2 | =B1.cursor@x("SELECT O_ORDERKEY,O_CUSTKEY,O_ORDERSTATUS,O_TOTALPRICE,O_ORDERDATE,O_ORDERPRIORITY,O_CLERK,O_SHIPPRIORITY,O_COMMENT FROM ORDERS WHERE MOD(O_ORDERKEY,4)="/(A1-1)/"ORDER BY 1") | |
3 | =file("orders"/A1/".btx").export@b(B2) | |
4 | =directory("orders?.btx") | |
5 | =A4.(file(~).cursor@b()).merge(#1) | |
6 | =file("hisdata.ctx").create(#o_orderkey,o_custkey,o_orderstatus,o_totalprice,o_orderdate,o_orderpriority,o_clerk,o_shippriority,o_comment) | |
7 | >A6.append@i(A5) | |
8 | =A4.(movefile(~)) |
Executing this code will generate a composite table file ‘hisdata.ctx’ ordered by the primary key O_ORDERKEY.
When the primary key value of incremental data is just after the maximum primary key value of historical data, the incremental data can be directly appended to the historical data.
For example, the maximum value of the primary key ‘O_ORDERKEY’ of the historical data composite table ‘hisdata.ctx’ is 6000000, and the primary key values of the incremental data ‘newdata.csv’ increase incrementally from 6000001:
O_ORDERKEY | O_CUSTKEY | O_ORDERSTATUS | … |
---|---|---|---|
1 | 36901 | O | … |
2 | 78002 | O | … |
32 | 130057 | O | … |
… | … | … | … |
hisdata.ctx
O_ORDERKEY | O_CUSTKEY | O_ORDERSTATUS | … |
---|---|---|---|
6000001 | 666 | O | … |
6000011 | 54321 | F | … |
6000012 | 12345 | O | … |
… | … | … | … |
newdata.csv
SPL code:
A | |
---|---|
1 | =file("newdata.csv").cursor@ct() |
2 | =file("hisdata.ctx").open().append@i(A1) |
When the amount of historical data is small, we just need to merge the incremental data with the original table.
For example, the maximum value of the primary key ‘O_ORDERKEY’ of the historical data composite table ‘hisdata.ctx’ is 6000000, and there are primary key values smaller than 6000000 in the incremental data ‘newdata.csv’:
O_ORDERKEY | O_CUSTKEY | O_ORDERSTATUS | … |
---|---|---|---|
1 | 36901 | O | … |
2 | 78002 | O | … |
32 | 130057 | O | … |
… | … | … | … |
hisdata.ctx
O_ORDERKEY | O_CUSTKEY | O_ORDERSTATUS | … |
---|---|---|---|
3 | 444 | F | … |
500 | 66666 | O | … |
6000012 | 12345 | O | … |
… | … | … | … |
newdata.csv
SPL code:
A | |
---|---|
1 | =file("newdata.csv").cursor@ct() |
2 | =file("hisdata.ctx").reset(;A1) |
As the amount of historical data increases over time, the time to merge with the original table becomes longer and longer. To solve this, we can divide the data composite table file into two parts: the historical data composite table ‘hisdata.ctx’ and the incremental data composite table ‘newdata.ctx’. In this way, we only need to append data to the incremental data composite table, and merge the incremental data with historical data after a period of time. For example, the incremental data needs to be updated daily and reorganized monthly:
A | B | |
---|---|---|
1 | if day(now())==1 | =file("hisdata.ctx").reset(;file("newdata.ctx").open().cursor()) |
2 | =file("newsdata.ctx").create@y(#o_orderkey,o_custkey,…) | |
3 | =file("newdata.ctx").reset(;file("newdata.csv").cursor@ct()) |
When fetching data, it needs to merge the historical data composite table with the incremental data composite table, for example:
=[file("newdata.ctx").open().cursor(),file("hisdata.ctx").open().cursor()].merge(#1)
It should be noted that the cursor sequence should be in the same order as data files, that is, [incremental data, historical data].
If the amount of data is larger, it may also involve dividing data into zones and storing as multi-zone composite table. Refer to: https://c.scudata.com/article/1700643363322 for details.
The modification and deletion of incremental data usually do not involve very large amounts of data, so we only introduce the update method on a single composite table. If the amount of data is huge and multiple composite tables are needed, the multi-zone composite table can be used. For details, visit: https://c.scudata.com/article/1700643363322.
When involving the modification of incremental data, the historical data needs to be updated by primary key.
For example, the maximum value of the primary key ‘O_ORDERKEY’ of the historical data composite table ‘hisdata.ctx’ is 6000000, and there is an incremental data ‘newdata.csv’:
O_ORDERKEY | O_CUSTKEY | O_ORDERSTATUS | … |
---|---|---|---|
1 | 36901 | O | … |
2 | 78002 | O | … |
32 | 130057 | O | … |
… | … | … | … |
hisdata.ctx
O_ORDERKEY | O_CUSTKEY | O_ORDERSTATUS | … |
---|---|---|---|
1 | 1111 | F | … |
999 | 9999 | O | … |
6000012 | 12345 | O | … |
… | … | … | … |
newdata.csv
SPL code:
A | |
---|---|
1 | =file("newdata.csv").cursor@ct() |
2 | =file("hisdata.ctx").reset@w(;A1) |
The historical data is the same as above. The difference is that the incremental data ‘newdata.csv’ adds a status column ‘STATUS’ after its primary key ‘O_ORDERKEY’. This column records four different statuses: B (primary key change), D (delete), A (update), I (insert), and B and I always appear in pairs, for example:
O_ORDERKEY | STATUS | O_CUSTKEY | O_ORDERSTATUS | … |
---|---|---|---|---|
1 | B | 36901 | O | … |
6000001 | I | 36902 | F | … |
2 | A | 123314 | F | … |
6000001 | A | 36902 | O | … |
33 | D | 136777 | O | … |
6000002 | I | 88888 | O | … |
… | … | … | … | … |
6000002 | A | 88888 | F | … |
… | … | … | … | … |
In this case, it needs to add a deletion flag column (the field can be named arbitrarily, we name it DEL here) after the dimension field when designing the composite table. The values in this column include false and true, which represent the valid record and the invalid record respectively, and the value is set as false by default. The following table is the historical data composite table ‘hisdata.ctx’:
C_CUSTKEY | DEL | C_NAME | C_ADDRESS | … |
---|---|---|---|---|
1 | false | 36901 | O | … |
2 | false | 78002 | O | … |
32 | false | 130057 | O | … |
33 | false | 66958 | F | … |
34 | false | 61001 | O | … |
… | … | … | … | … |
It should be noted that when creating a composite table with the deletion flag, the create@d parameter needs to be used so as to make the first column after the dimension be the deletion flag column.
To describe the incremental data ‘newdata.csv’ by status, it is not difficult to convert it to the following form:
C_CUSTKEY | DEL | C_NAME | C_ADDRESS | … |
---|---|---|---|---|
1 | true | 36901 | O | … |
2 | false | 123314 | F | … |
33 | true | 136777 | O | … |
… | … | … | … | … |
6000001 | false | 36902 | O | … |
6000002 | false | 88888 | F | … |
… | … | … | … | … |
SPL code:
A | |
---|---|
1 | =file("newdata.csv").cursor@ct() |
2 | =file("hisdata.ctx").reset@w(;A1) |
After merging the composite table that has a deletion flag column, the record that is true in the deletion flag column will be deleted from the composite table. When reading a composite table, we can choose not to read the deletion flag column in the cursor, for example: =file("hisdata.ctx").open().cursor(o\_orderkey,o\_custkey,o_orderstatus,…)
SPL Resource: SPL Official Website | SPL Blog | Download esProc SPL | SPL Source Code