-
Notifications
You must be signed in to change notification settings - Fork 339
Routine for real time data updating
This routine has similar applicable scenarios as “Routine for real-time data appending”, except that the data needs to be updated.
This routine is applicable to the following scenarios: the real-time requirement for data maintenance is very high, the cycle period for updating data is short, and data may be updated at any time; the data need to be stored in multiple zone tables (hereinafter referred to as ‘table’ unless otherwise specified) of a multi-zone composite table in layers; only the update mode is supported.
Key differences from append routine:
-
New data need to be sorted by the primary key field of composite table, and the received new data need to be identified with the current time and written directly to the table of layer 0.
-
There is no chaos period, and nor does it involve making up for the missing data.
-
There is only one main table at the highest layer, and all data will eventually be merged into the main table, which is unlike the append routine that may divide the table of the highest layer in multiple tables by time.
-
There is no time interval parameter when querying, and all tables will be merged every time the query is performed.
Key differences from append routine:
-
Once the new data are received, they will be written to a layer-0 table with the current time as its table number. If there is need of deleting data, add a deletion flag field with two values: true means deleting the record, and false means retaining it. For applications that do not need to delete record, just set this field to null.
-
Merge mode: when merging from layer n-1 to layer n, the update mechanism of multi-zone composite table is adopted. The highest layer has only one main table, one manual merge thread is used to merge data into the main table.
-
Main table number: since there is only one main table, the table number alternates between 0 and 1. When the current table number is 0, the data is merged to 1; when the current table number is 1, the data is merged to 0.
-
Query thread: it does not involve query parameter, and will merge all table numbers into one sequence in chronological order and return.
Configuration file: ubc.json
By default, this file is located in the main path of esProc (if you want to save it to another directory, modify the code to use an absolute path). The configuration file includes the following contents:
[{ "sortKey":"account,tdate",
"deleteField":"deleted",
"otherFields":"ColName1,ColName2",
"dataDir":"data/",
"dataFilename":"data.ctx",
"queryPeriod":120,
"level":[0,1,2,3,4,5] ,
"blockSize":[65536,131072,262144,524288,1048576,1048576],
"interval":[60,10,2,1],
"discardZone":[],
"discardTime":[]
}]
"sortKey” refers to the name of primary key field. When there are multiple such fields, they are separated by commas.
“deleteField” refers to the name of the deletion flag field. If there is no deletion operation, fill in null.
“otherFields” refers to the name of other fields, and these fields are separated by commas.
“dataDir” refers to the storage path of composite table relative to the main directory.
“dataFilename” refers to the name of composite table file, such as “data.ctx”.
“queryPeriod” refers to the maximum query period measured in seconds. When a certain table has been discarded for a time length longer than this period, the table will be deleted.
“level” refers to the layer level, and its values are 0, 1, 2, 3, 4 and 5, representing the layers 0, second, minute, hour, day and month respectively. Except for layer 0, other layers can be selectively configured but must be in an order from small to large. For example, we can configurate the layers as [0,3,4,5], yet we cannot configure as [0,2,1,3,4,5] because it is out of order, nor can we configure as [1,2,3,4] because layer 0 is missing.
“blockSize” refers to the block size set for the table of each layer. Generally, set the min block size as 65536 and set the max block size as 1048576; setting it too small or too large is inappropriate. The block size is the size of one block that needs to be expanded each time if the table capacity is not enough when appending data to it. When reading the table data, only one data block can be read at a time, regardless of how much actual data is in the table. However, the table is stored by column and block, with at least two blocks required to store a field - one block is used to store the block information, and the other is used to store data. Therefore, when the number of fields in a table is very large, and many fields need to be read when reading data, the memory space required is [number of fields read * 2 block size]. The smaller the block is set, the less memory space it takes up. But if the amount of data is very large, the larger the block is set, the faster the data will be read. Therefore, for low-level tables, we can set a smaller block size to save memory space; for high-level tables, we can set a larger block size to speed up data reading due to the larger amount of data.
“interval” refers to the layer interval, i.e., the time length in units of layer level. There is no interval for layer 0 and the highest layer (since the highest layer has only one table), and other layers should be filled based on actual needs. The parameter filled in can only be an integer. Fill the first one in layer 1, and fill the second one in layer 2. The number of parameters filled in is two less than that of levels because there are no values of layer 0 and the highest layer.
“discardZone” refers to the list of the discarded table numbers, the content of which is automatically generated by code, and is initially set as [].
“discardTime” refers to the discarded time list, the content of which is automatically generated by code, and is initially set as [].
Files and sub-paths in the main path:
data: refers to the storage path of composite table, and the directory name is set in ubc.json (refer to the description above).
ubc.json: refers to configuration file.
zone.btx: refers to the table number storage file.
The files under ‘data’ include:
The form of file name is ‘table number.composite table file name.ctx’, where the composite table file name can be set in ubc.json (refer to the description above), and the table number is automatically calculated by the system based on the layer level (see section 2 for the rule for calculating table number).
The data situation of a certain e-commerce system is as follows: Number of users: 100-10 million; the account balance table changes at any time; some accounts that have not been used for a long time and have a balance of 0 will be automatically closed; some users will actively close their accounts.
In this example, since the amount of update data is relatively large, it would accumulate too large update data if we waited for a month to merge the update data into the main table, which will seriously slow down the query speed. To solve this, we design three layers to store data, that is, layer 0, layer 1 (storing the data of 2 hours), layer 2 (storing the data of one day) and layer 3 (i.e., the main table layer for storing the data of one month).
The configuration file is as follows;
[{ "sortKey":"Account,Ttime",
"deleteField":"Deleted",
"otherFields":"Balance,Currency",
"dataDir":"data/",
"dataFilename":"data.ctx",
"queryPeriod":120,
"level":[0, 3,4,5],
"interval":[2,1],
"discardZone":[],
"discardTime":[]
}]
zones: the sequence of sequences, refers to the table number being used in each layer.
Configuration information lock: use “config” as its name.
Lock for modifying zones: use “zones” as its name.
This script is executed only once when the server starts and will no longer be executed during calculating. If the server is started for the first time, it needs to initialize parameters; if the server is started for the nth time, it needs to read the configuration information written out at the end of the n-1th execution.
A | B | |
---|---|---|
1 | =file("zone.btx") | |
2 | if(A1.exists()) | =A1.import@bi() |
3 | else | =json(file("ubc.json").read()) |
4 | >B2=B3.level.len().([]) | |
5 | =env(zones,B2) | |
6 | =register("zone","zone.splx") | |
7 | =register("zone0","zone0.splx") | |
8 | =call@n("write.splx") |
A1: read the table number storage file; A2-B4: if the table number file exists, read this file. Otherwise, create a sequence consisting of empty sequences and having a length equal to the number of layers; A5: set the table number sequence as global variable ‘zones’; A6: register the zone.splx script as a function with the same name; A7: register the zone0.splx script as a function with the same name; A8: start the merge thread.
This script is used to write the new update data to layer 0 every time they are received. The input parameter ‘data’ represents the table sequence containing new update data. Once the new data are written out, it will add the new table number to the table number list. The new data is required to be sorted by the primary key field of composite table before passing them in.
A | |
---|---|
1 | =now() |
2 | =lock("config") |
3 | >config=json(file("ubc.json").read()) |
4 | =lock@u("config") |
5 | =zone0(A1) |
6 | =file(config.dataDir/config.dataFilename:[A5]) |
7 | =config.sortKey.split@c().("#"+trim(~)).concat@c() |
8 | =if(config.deleteField, A6.create@d(${A7},${config.deleteField},${config.otherFields};;config.blockSize(1)), A6.create(${A7},${config.otherFields};;config.blockSize(1)) ) |
9 | =A8.append@i(data.cursor()) |
10 | =A8.close() |
11 | =lock("zones") |
12 | =zones(1)|A5 |
13 | >zones=[A12]|zones.to(2,) |
14 | =file("zone.btx").export@b(zones) |
15 | =lock@u("zones") |
A1: the current time; A3: read the configuration file; A5: calculate the layer-0 table number based on the current time; A6-A8: generate the layer-0 table. The @d option indicates that the first field after the primary key is the deletion flag field with values of true/false; A9: write the update data to the table; A11-A15: record the new table number and back it up to a file; Note: the order of fields that pass in data should follow the following rules: If the deletion flag field is added: primary key field, deletion flag field, other fields; If the deletion flag field is not added: primary key field, other fields; The order of fields is the same as that in the configuration file.
This script is used to calculate the layer-0 table number based on time, for internal use.
Input parameter: tm: time
A | B | |
---|---|---|
1 | 2023 | |
2 | [27,23,18,13,7,1] | |
3 | return [year(tm)-A1+8, month(tm),day(tm),hour(tm)+1,minute(tm)+1,second(tm)+1].sum(shift(~,-A2(#)))+1 |
A1: start year of data; A2: the number of binary bits on the right of each layer level, in the order of year, month, day, hour, minute and second.
The script is used to convert the low-layer table number to high-layer table number, for internal use.
Input parameter: z: low-layer table number; n: layer sequence number of high layer (refers to the sequence number in config.level, not the layer level); config: configuration file content;
A | B | |
---|---|---|
1 | [27,23,18,13,7,1] | |
2 | 23 | |
3 | =config.interval(n-1) | |
4 | >p = 7 - config.level(n) | |
5 | >p=if(monthCumulate && p==3,2,p) | |
6 | >b = A1(p) | |
7 | >z1 = shift(z,b) | |
8 | >b1 = A1(p-1)-b | |
9 | >s = (and( z1, shift(1,-b1)-1 )-1) \A3*A3 + 1 | |
10 | =shift(shift( shift(z1,b1),-b1)+s, -b) | |
11 | if(p>2) | return A10 |
12 | =and(shift(A10,A1(3)),shift(1,A1(3)-A1(2))-1) | |
13 | =and(shift(A10,A1(2)),shift(1,A1(2)-A1(1))-1) | |
14 | =shift(A10,A1(1))-8+A2 | |
15 | return A14*100000+A13*1000+A12*10 |
A1: the number of binary bits on the right of each layer level, in the order of year, month, day, hour, minute and second; A2: start year of data, only the last two digits are taken; A3: layer interval of high layer; A4: based on the layer level of high layer, calculate its bit number in A2; A5: if month accumulation is required, change to the month-level bit number; A6: the number of bits that the high layer needs to be truncated; A7: the value of table number after truncation; A8: the number of bits used in the last level after truncation; A9: divide the value of the last level by the layer interval and add 1; A10: put s into z and add 0 at the end; A11: if month accumulation is not required, return the table number directly; A12-A14: calculate the components of year, month, and day respectively; A15: piece together the plain text table number and return.
This thread is used to periodically merge the data of layer n-1 into layer n using the update mechanism. After each execution, it turns to layer 1. The loop goes to the upper layer only when there is no merge operation at the lower layer. After merging all layers and sleeping for a time length of layer 1 interval, the thread is executed again.
A | B | C | D | E | F | |
---|---|---|---|---|---|---|
1 | =lock("config") | |||||
2 | >config=json(file("ubc.json").read()) | |||||
3 | =long(now())-config.queryPeriod*1000 | |||||
4 | =config.discardTime.pselect@a(~<A3) | |||||
5 | =config.discardZone(A4).run(movefile(config.dataDir/~/"."/config.dataFilename)) | |||||
6 | >config.discardZone.delete(A4),config.discardTime.delete(A4) | |||||
7 | =file("ubc.json").write(json(config)) | |||||
8 | =lock@u("config") | |||||
9 | = zone0(now(),config ) | |||||
10 | =config.level.len()-1 | |||||
11 | =if(config.level.m(-1)==5,A10-1,A10) | |||||
12 | for A11 | |||||
13 | >zz = zones(A12) | |||||
14 | >z =zone(A9,A12+1,config,false) | |||||
15 | >zm = zz.select(A12==A10 || ~< z) | |||||
16 | if zm.len()>0 | |||||
17 | if(A12<A10 && config.level(A12+1)==4) | |||||
18 | =zm.group(zone( ~, A12+1,config,true):zu;~:zd) | |||||
19 | else | |||||
20 | >D18=zm.group(zone( ~, A12+1,config,false):zu;~:zd) | |||||
21 | >zus = zones(A12+1) | |||||
22 | =[] | =[] | ||||
23 | for D18 | |||||
24 | =zus.select@1(xor(~,C23.zu)<=1) | |||||
25 | =if(D24,xor(D24,1),C23.zu) | |||||
26 | =file(config.dataDir/config.dataFilename:(D24|C23.zd)) | |||||
27 | =file(config.dataDir/config.dataFilename:D25) | |||||
28 | =D26.reset@w(D27:config.blockSize(#A12+1)) | |||||
29 | =lock("config") | |||||
30 | >config=json(file("ubc.json").read()) | |||||
31 | >config.discardZone.insert(0,(D24|C23.zd)) | |||||
32 | >config.discardTime.insert(0,[long(now())]*(D24|C23.zd).len()) | |||||
33 | =file("ubc.json").write(json(config)) | |||||
34 | =lock@u("config") | |||||
35 | >C22=C22|D24,D22=D22|D25 | |||||
36 | =lock("zones") | |||||
37 | =zones(A12)\zm | |||||
38 | =(zones(A12+1)\C22)|C23 | |||||
39 | >zones=zones.(~).modify(A12,[C37,C38]) | |||||
40 | =file("zone.btx").export@b(zones) | |||||
41 | =lock@u("zones") | |||||
42 | goto A1 | |||||
43 | =config.interval(1) | |||||
44 | =sleep(case(config.level(2),2:A43*60,3:A43*3600,4:A43*3600*24;A43)*1000) | |||||
45 | goto A1 |
A2: read the configuration file; A3-A6: delete the table that has been discarded for a duration exceeding the max query period; A9: table number of layer 0 at current time; A10: exclude the layer number of the highest layer; A11: if the highest layer is month, there is no need to merge into the main table, so the number of layers that need to be looped is further reduced by 1; A12: loop from layer 0; B13: table number list of layer n; B14: calculate the table number of A9 in the target layer; B15: select the current layer table number that is less than B14. If merging into the main table, select all table numbers; B16: if the current layer has table, then: C17: merge into the day layer, not the highest layer; D18: group by the month table number; C19: otherwise: D20: group by the target layer table number; C22-D22: temporarily store the discard table number and new table number; C23: loop by group; D24: read the table number to which the current group belongs from zus (including two kinds of table numbers with the last bit being 0 or 1); D25: calculate out the new table number. If the table number to which the current group belongs exists, alternate the new table number (if the last bit of the original table number is 1, then the last bit of new table number is 0, and vice versa). Otherwise, calculate the table number based on the current group; D26-D28: merge the current group’s table data with the original table data, and then write to a new table; D29-D34: write the written-out table number to the discard table number list of config, and record the discard time; D35: temporarily store the discard table number and new table number to C22 and D22 respectively; C36-C41: update the table number list and back it up to a file.
This script is used to manually merge the month tables into main table.
Input parameter: t: query time. Merge the month tables updated before this time.
A | B | |
---|---|---|
1 | =lock("config") | |
2 | >config=json(file("ubc.json").read()) | |
3 | =lock@u("config") | |
4 | =config.level.len() | |
5 | if t!=null | =zone0(t,config) |
6 | =zone(B5,A4-1,config,true) | |
7 | =zones.m(-2).select(~<B6) | |
8 | else | >B7=zones.m(-2) |
9 | =zones(A4) | |
10 | =if(A9.len()==1,xor(A9(1),1),0) | |
11 | =file(config.dataDir/config.dataFilename:(A9|B7)) | |
12 | =file(config.dataDir/config.dataFilename:A10) | |
13 | =A11.reset@w(A12:config.blockSize.m(-1)) | |
14 | =lock("config") | |
15 | >config=json(file("ubc.json").read()) | |
16 | >config.discardZone.insert(0,(A9|B7)) | |
17 | >config.discardTime.insert(0,[long(now())]*(A9|B7).len()) | |
18 | =file("ubc.json").write(json(config)) | |
19 | =lock@u("config") | |
20 | =lock("zones") | |
21 | =zones(A4-1)\B7 | |
22 | =(zones(A4)\A9)|A10 | |
23 | >zones=zones.(~).modify(A4-1,[A21,A22]) | |
24 | =file("zone.btx").export@b(zones) | |
25 | =lock@u("zones") |
A5-B8: if ‘t’ is not null, select the numbers of month table finished before the time ‘t’. Otherwise, select all month table numbers; A9: the original main table number; A10: the new main table number; A11-A13: merge the original main table with the month tables, and then write to the new main table; A14-A19: write the original main table number and month table numbers to the discard list, and record the discard time; A20-A25: update the table number list and back it up to a file.
This script is used when querying data and returns the table numbers of multi-zone composite table.
A | |
---|---|
1 | return zones.rvs().conj() |
Notes:
- The multi-zone composite table must be closed after use. Otherwise, the table cannot be deleted after merge.splx executes the merge operation.
- When generating a multi-zone composite table cursor, the @w option must be added, i.e., cursor@w(), indicating that the update mechanism is adopted to merge tables.
Download the routine code: code.zip
SPL Resource: SPL Official Website | SPL Blog | Download esProc SPL | SPL Source Code