forked from ccorcos/tuple-database
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathConcurrencyLog.ts
101 lines (90 loc) · 2.44 KB
/
ConcurrencyLog.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import { mutableFilter } from "../helpers/mutableFilter"
import { outdent } from "../helpers/outdent"
import { Bounds, isTupleWithinBounds } from "../helpers/sortedTupleArray"
import { Tuple } from "../storage/types"
import { TxId } from "./types"
type ReadItem = { type: "read"; bounds: Bounds; txId: TxId }
type WriteItem = { type: "write"; tuple: Tuple; txId: TxId | undefined }
type LogItem = ReadItem | WriteItem
export class ReadWriteConflictError extends Error {
constructor(txId: string | undefined, writeTuple: Tuple, readBounds: Bounds) {
const message = outdent(`
ReadWriteConflictError: ${txId}
Write to tuple ${writeTuple}
conflicted with a read at the bounds ${JSON.stringify(readBounds)}
`)
super(message)
}
}
export class ConcurrencyLog {
// O(n) refers to this.log.length
log: LogItem[] = []
// O(1)
/** Record a read. */
read(txId: TxId, bounds: Bounds) {
this.log.push({ type: "read", txId, bounds })
}
// O(n)
/** Add writes to the log only if there is a conflict with a read. */
write(txId: TxId | undefined, tuple: Tuple) {
for (const item of this.log) {
if (item.type === "read" && isTupleWithinBounds(tuple, item.bounds)) {
this.log.push({ type: "write", tuple, txId })
break
}
}
}
// O(n^2/4)
/** Determine if any reads conflict with writes. */
commit(txId: TxId) {
try {
const reads: Bounds[] = []
for (const item of this.log) {
if (item.type === "read") {
if (item.txId === txId) {
reads.push(item.bounds)
}
} else if (item.type === "write") {
for (const read of reads) {
if (isTupleWithinBounds(item.tuple, read)) {
throw new ReadWriteConflictError(item.txId, item.tuple, read)
}
}
}
}
} finally {
this.cleanupReads(txId)
this.cleanupWrites()
}
}
cancel(txId: TxId) {
this.cleanupReads(txId)
this.cleanupWrites()
}
// O(n)
/** Cleanup any reads for this transaction. */
cleanupReads(txId: string) {
mutableFilter(this.log, (item) => {
const txRead = item.txId === txId && item.type === "read"
return !txRead
})
}
// O(n)
/** Cleanup any writes that don't have conflicting reads. */
cleanupWrites() {
const reads: Bounds[] = []
mutableFilter(this.log, (item) => {
if (item.type === "read") {
reads.push(item.bounds)
return true
} else {
for (const read of reads) {
if (isTupleWithinBounds(item.tuple, read)) {
return true
}
}
return false
}
})
}
}