-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathrollups.sql
53 lines (48 loc) · 1.77 KB
/
rollups.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
CREATE TABLE rollups (
name text primary key,
event_table_name text not null,
event_id_sequence_name text not null,
last_aggregated_id bigint default 0
);
CREATE OR REPLACE FUNCTION incremental_rollup_window(rollup_name text, OUT window_start bigint, OUT window_end bigint)
RETURNS record
LANGUAGE plpgsql
AS $function$
DECLARE
table_to_lock regclass;
BEGIN
/*
* Perform aggregation from the last aggregated ID + 1 up to the last committed ID.
* We do a SELECT .. FOR UPDATE on the row in the rollup table to prevent
* aggregations from running concurrently.
*/
SELECT event_table_name, last_aggregated_id+1, pg_sequence_last_value(event_id_sequence_name)
INTO table_to_lock, window_start, window_end
FROM rollups
WHERE name = rollup_name FOR UPDATE;
IF NOT FOUND THEN
RAISE 'rollup ''%'' is not in the rollups table', rollup_name;
END IF;
IF window_end IS NULL THEN
/* sequence was never used */
window_end := 0;
RETURN;
END IF;
/*
* Play a little trick: We very briefly lock the table for writes in order to
* wait for all pending writes to finish. That way, we are sure that there are
* no more uncommitted writes with a identifier lower or equal to window_end.
* By throwing an exception, we release the lock immediately after obtaining it
* such that writes can resume.
*/
BEGIN
EXECUTE format('LOCK %s IN SHARE ROW EXCLUSIVE MODE', table_to_lock);
RAISE 'release table lock' USING ERRCODE = 'RLTBL';
EXCEPTION WHEN SQLSTATE 'RLTBL' THEN
END;
/*
* Remember the end of the window to continue from there next time.
*/
UPDATE rollups SET last_aggregated_id = window_end WHERE name = rollup_name;
END;
$function$;