-
Notifications
You must be signed in to change notification settings - Fork 339
SPL practice:space time collision problem that renders MPP powerless to solve
A certain time interval (such as 7 days) is divided into multiple time slices with fixed time length (like 15 minutes). If object a and object b once appeared at the same location in the same time slice, we call it one collision.
Rule 1: Multiple collisions in the same time slice are counted as one collision.
Rule 2: In the same time slice, if the last appearance locations of two objects are different, it is called a mismatch. Only when the number of mismatched time slices does not exceed 20 would the collisions (including the collisions of other time slices) be considered effective.
Requirement: given object ‘a’, find out the top 20 objects ‘b’ that has the maximum number of effective collisions with ‘a’ within the specified time interval, under the premise of satisfying the above two rules.
The data is stored in a single data table, with approximately 8 billion records per day. Each object has an average of 1000 records, and each record stores the time and space information of object (object flag, timestamp, space tag). When the time interval is 7 days, the total data volume is 56 billion rows, and the data structure is as follows:
Field name | Field type | Field meaning | Sample data |
---|---|---|---|
no | String | Unique id of object | 100000000009 |
ct | Int | Time stamp (accurate to the second) | 1690819200 |
lac | String | Space tag 1 | 40000 |
Ci | String | Space tag 2 | 66000000 |
Value of the ‘no’ field is completely composed of digits, and the fields ‘lac’ and ‘ci’ always appear in pairs. For convenience of description, we collectively call ‘lac’ and ‘ci’ one field ‘loc’, and we know that the count range of ‘loc’ after de-duplication does not exceed 270,000.
We hope to obtain the result within one hour on a cluster consisting of five 64C256G servers. Unfortunately, this expectation is not met, despite employing a world-renowned MPP database.
Indeed, it is not easy for a relational database to solve this problem quickly, we attempt to work out the task that ignores rule 2 in SQL:
WITH DT AS ( SELECT DISTINCT no, loc, int(ct/15 minutes) as ct FROM T )
SELECT TOP 20 * FROM
( SELECT B.no, COUNT(DISTINCT B.loc) cnt
FROM DT AS A JOIN DT AS B ON A.loc=B.loc AND A.ct=B.ct
WHERE A.no=a AND B.no<>a
GROUP BY B.no)
ORDER BY cnt DESC
The DISTINCT and JOIN operations in SQL will involve calculating and comparing HASH values. When the amount of data is large, the amount of calculation will be large, which will seriously affect the performance. Moreover, both operations will involve random access, which is usually performed in memory and, the buffer is needed when the amount of data is too large, which will lead to a sharp decline in performance and may even result in out of memory. Rule 1 alone already makes the calculation in SQL very slow. If Rule 2 is taken into account, it is not surprising that the result can’t be calculated in MPP.
If we first take out all relevant records of objects a and b in the time interval and store them as in-memory sets, and then count the number of effective collisions between a and b, the problem won’t be very difficult. Since the number of records corresponding to each object is not very large - less than 10,000 records even for a 7-day interval - there is no pressure for memory to hold them.
Let the record set of a be A, that of b be B; divide A into groups A1, ..., An by the time slice, and divide B into groups B1... Bn; sort all members in Ai and Bi by ct (from small to large).
The calculation of the number of collisions between a and b within the time slice ‘i’ (without considering the two rules) is as below:
Ci=Bi.count(Ai.(loc).contain(loc))
It is to count how many locs of Bi once appeared in Ai.
However, the speed of such two-layer loop computation is slower. We know that a and Ai are fixed relative to b, so we can deduplicate the loc in Ai in advance and then build an index, as follows:
Ai’=Ai.id(loc).key@i(loc)
Ci=Bi.switch@i(loc,Ai’).len()
By using switch@i to filter out Bi members that fail to find loc in Ai, we can also obtain the number of collisions.
We only need to calculate the number of time slices with Ci>0 to get the number of collisions that satisfy rule 1.
Similarly, we can use
Di=Ai.m(-1).loc!=Bi.m(-1).loc //m(-1)/means taking the last member of a set
to determine whether there is a mismatch between a and b in the time slice i.
Once Ci and Di are available, it is easy to calculate the number of effective collisions between a and b.
if(count(Di)<=20,count(Ci>0))
What remains is the common task of calculating TopN for this value.
If the data is ordered by no and ct, it is also easy to implement this idea. To be specific, we can take out A in one go using the binary search, and then traverse object b from the scratch. Since the data is in order, it is easy to take out the corresponding B each time. When both A and B are ordered by ct, we can employ the ordered grouping algorithm to calculate Ai and Bi, and can guarantee the correctness of the above m(-1).
Regrettably, relational databases cannot guarantee the ordered storage of data, nor do they provide relevant ordered computation algorithm, so we have to write very complicated SQL code that nests multiple layers.
Fortunately, we have SPL, which provides ordered storage and related computing mechanisms, making it easy to implement the above-mentioned idea.
Based on this idea, there are also some engineering optimization methods.
Convert the values of ‘no’ field to number; combine the ‘lac’ and ‘ci’ into a single ‘loc’, and convert its values to sequence number (these values are originally strings, which can be processed as sequence numbers in passing when converting them to numbers).
The data structure after conversion is as follows:
| Field name | Data type | Field meaning | Sample data| | --- | --- | --- | --- | --- | | No | Long | Unique id of object | 100000000009| | Ct | Int | Time stamp (accurate to the second) | 1690819200| | Loc | Int | Location tag | 10282|
Compared with original data structure, two modifications are made during data dump:
1. Combine the ‘lac’ and ‘ci’ fields into a single ‘loc’ field and convert its value to Int sequence number. The original ‘lac’ and ‘ci’ fields are stored separately as dimension table.
2. Convert the data type (number string) of the ‘no’ field to Long integer.
As mentioned in the previous analysis, index is built for Ai of each time slice. Yet, since Ai is too small (the average length is about 10), the effect of using index for too small set is not significant. Therefore, we change the object for which we build the index in practice, that is, build an index for the entire A (length is about 1000). In this way, we need to add the sequence number ‘i’ of time slice to the primary key. The code is roughly as follows:
A’=A.derive((ct-st)\900:i).groups(i,loc).index()
where st is the starting timestamp of time interval, which means that a time slice is formed every 900 seconds.
Then, the calculation of Ci needs to be changed to first associate (filter) and then group:
B.derive((ct-st)\900:i).join@i(i:loc,A).groups(i;count(1):C)
Now we can calculate out the table sequence with i and Ci as field, and filter out the non-collision cases by using join@i.
When join@i uses index to perform association and filter, it still needs to calculate and compare the HASH values, which will increase a certain amount of computation. In fact, we know that the maximum number of combinations between ‘i’ and ‘loc’ is 7 (days) * 96 (96 15-minute intervals per day) * 270,000, which is not very large. If we use a boolean value array (sequence) to represent whether A once appeared at a certain ‘loc’ in every time slice, then its length will be at most 7*96*270,000, which doesn’t put any pressure on memory capacity. In this way, we can use the aligned sequence technology to implement association and filter, thus avoiding the time of computing and comparing HASH values, and speeding up the calculation of Ci.
Use aloc to represent the aligned sequence of A:
aloc=A.align@a(672,(ct-st)\900+1).(x=270000.(false),~.run(x(loc)=true),x)
Because there are two dimensions: time slice and location, we also use a two-layer aligned sequence. Divide A into 672 (7*96) groups by time slice, and each group is a sequence consisting of 270,000 boolean value members. For the object that once appeared at location ‘loc’ in time slice ‘i’, we can quickly determine whether it has collided with ‘a’ by simply using aloc(i)(loc) (i.e., determine whether ‘a’ also once appeared at location ‘loc’ in time slice ‘i’).
The last location of ‘a’ in each time slice can also be represented with a sequence:
alast=A.align@a(672,(ct-st)\900+1).(~.m(-1).loc)
The alast(i) represents the last location of ‘a’ in the time slice ‘i’. Likewise, the last location can be easily accessed with the sequence number of time slice so as to calculate Di quickly.
The algorithm described above requires the data to be sorted by no and ct. However, new data is added every day, and the new data is usually only ordered by ct or even completely unordered. If we sorted all the data every time, it would be very slow. Even if only the new data was sorted and merged, we still would have to rewrite 56 billion rows of data, which is too time-consuming.
Logically, the multi-zone composite table of SPL can merge multiple ordered composite tables into a larger ordered composite table. Therefore, we can store the new data of each day to a composite table (zone table), and then merge the data of zone tables of the multi-zone composite table while computing. Since the merged data also supports parallel computing, the rewrite of the full data every day is avoided. Although it needs to take some time to merge when reading the multi-zone composite table, it is worth it because it makes the data maintenance flexible.
When the historical data expires, just delete the zone table file of the corresponding date directly, which is very simple.
Store the data by day, sort the data of each day by no and ct and save it as a columnar storage composite table. For example, store the data of 7 days respectively as: 1.day.ctx, ..., 7.day.ctx, and the 7 zone tables compose a multi-zone composite table. The script to create data can be written as:
A | B | C | |
---|---|---|---|
1 | =rand@s(1) | ||
2 | for n =file("day"/A2/".btx") | ||
3 | =movefile(B2) | ||
4 | =elapse@s(sd,(A2-1)*86400) | ||
5 | =long(B4)\1000 | ||
6 | for nm | =1000000.new(100000000000+rand(8000000):no,int(B5+rand(86400)):ct,int(rand(270000)+1):loc) | |
7 | =B2.export@ab(C6) | ||
8 | =file(A2/".day.ctx").create@py(#no,#ct,loc) | ||
9 | =B2.cursor@b().sortx(#1,#2) | ||
10 | >B8.append@i(B9) | ||
11 | =movefile(B2) |
There are 3 parameter values:
1. n: refers to the number of days, for example, 1 represents one day. 2. nm: refers to the number of millions per day, for example, 1000 represents 1 billion. 3. sd: refers to the start date, such as 2023-08-01.
In B8, the @p option is used to create a composite table, indicating that the first field ‘no’ is used as the segmentation key. During parallel computing, the composite table needs to be segmented. Since the records of ‘no’ cannot be assigned to two segments, we use the @p option to ensure the records of ‘no’ are assigned to one segment during the segmentation of composite table.
A | |
---|---|
1 | =now() |
2 | 270000 |
3 | =n*24*3600\pt |
4 | =file("day.ctx":to(n)).open() |
5 | =A4.cursor@m(ct,loc;no==src_no).fetch().align@a(A3,(ct-st)\pt+1) |
6 | =alast=A5.(~.m(-1).loc) |
7 | =aloc=A5.(x=A2.(false),~.run(x(loc)=true),x) |
8 | =A4.cursor@m(;no!=src_no).derive((ct-st)\pt+1:tn,aloc(tn)(loc):loca,alast(tn):lasta) |
9 | =A8.group@s(no,tn;lasta,count(loca):cnt,top@1(-1,0,loc):lastb) |
10 | =A9.group@s(no;count(cnt>0):cnt,count(lasta && lastb && lastb!=lasta):dcnt) |
11 | =A10.select(cnt>0 && dcnt<=A3).total(top(-20;cnt)) |
12 | =file("app2_result.csv").export@ct(A11.new(src_no,no:dst_no,cnt:count)) |
13 | =interval@ms(A1,now()) |
There are 4 parameter values:
1. src_no: refers to the id of object a, such as 100000000009. 2. st: refers to the start timestamp (second), such as 1690819200, corresponding to 2023-08-01 00:00:00. 3. n: refers to the number of days to be counted, such as 7. 4. pt: refers to the number of seconds of time slice, such as 900;
A3: the total number of time slices in the time interval to be counted;
A5: read the data of object a, generate the sequence numbers for the time slice and group by sequence number. When the composite table is ordered by no, using the condition ‘no==src_no’ can quickly locate the target data;
A6: calculate the last location value of a in each time slice based on A5;
A7: calculate the aligned sequence of a based on A5. The calculation principle has been explained earlier;
A8: traverse other objects (except a) and generate time slice sequence number tn (use new symbol to distinguish from a). For each record, find if the current object collides with a in the time slice tn and at the location loc from aloc, and record the result into loca, and take the last location value of object a in time slice tn from alast.
A9: group by object and time slice; use lasta to calculate the number of collisions ‘cnt’ between each object and a in the time slice, which is the Ci analyzed previously; calculate the last loc of the object in the time slice and record the result as lastb;
A10: further group by object and calculate the number of collisions and mismatches between the object and a (considering rule 1). Since Ci>0 in each time slice is considered as one collision, we write count(cnt>0) here and record the result into new cnt; when the final locations are different, it is counted as one mismatch and record the result into dcnt.
A11: filter out objects with ineffective collision and take the top 20 objects that have the maximum number of effective collisions; the condition adopted in this test is dcnt<=A3, which should actually be dcnt<=20 because there is almost no record whose count(Di)<=20 in the randomly generated data, and therefore, empty set will be calculated out. However, since the max value of count(Di) is A3, it ensures the result can always be calculated out. As a result, the amount of calculation will be larger than calculating actual data, which would be disadvantageous for testing the performance.
The above code is based on the premise that the values of ‘no’ field are already integer, and ‘lac’ and ‘ci’ are already combined and converted to sequence number. In reality, we need to perform data conversion and organization first, and then restore after calculation. For details, visit SPL Practice: integerization during data dump.
When the total time span is 7 days (the total data volume is 56 billion rows) and the time slice is 15 minutes, computing in SPL on a single machine (8C64G) takes 121 seconds.
In fact, achieving this performance requires using a small number of column-wise computing options of SPL Enterprise Edition. Since the use of such options doesn’t involve principle analysis, we do not describe it here in detail.
This article discusses a typical object counting problem, which generally has the following characteristics:
1. Count the number of objects that satisfy a certain condition.
2. The number of objects is very large, but the amount of data involved in each object is not large.
3. The condition is very complex, usually also related to the order, and requires some steps to determine.
When faced with such a problem, a common idea is to sort the data by object, then take the data of each object step by step and store into memory, and finally perform complex conditional judgment.
Such operation is very common in practice, such as counting the accounts of a bank, performing a funnel analysis on users of an e-commerce company, and so on.
It is difficult to implement such calculation in SQL because SQL cannot guarantee the ordered storage of data, and lacks order-related calculations. Moreover, it is difficult to work out complex judgments in SQL. If we use SQL to solve the problem, we often have to write very complicated and multiple-layer nested statements or use stored procedure. In any case, the execution performance will be very poor.
In contrast, SPL provides ordered storage and order-related algorithms, and supports complex process calculation, making it convenient to implement such counting calculation.
SPL Resource: SPL Official Website | SPL Blog | Download esProc SPL | SPL Source Code