-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathBatchDownloader.ts
151 lines (125 loc) · 4.07 KB
/
BatchDownloader.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import { assert, EthereumAddress, Hex } from '@byor/shared'
import { Logger } from '@l2beat/backend-tools'
import { zipWith } from 'lodash'
import { decodeFunctionData, GetLogsReturnType, parseAbiItem } from 'viem'
import {
FetcherRecord,
FetcherRepository,
} from '../peripherals/database/FetcherRepository'
import { EthereumClient } from '../peripherals/ethereum/EthereumClient'
import { abi } from './abi'
export interface Batch {
poster: EthereumAddress
timestamp: Date
calldata: Hex
}
const eventAbi = parseAbiItem('event BatchAppended(address sender)')
type EventAbiType = typeof eventAbi
export type BatchAppendedLogsType = GetLogsReturnType<EventAbiType>
export class BatchDownloader {
private lastFetchedBlock = 0n
constructor(
private readonly client: EthereumClient,
private readonly fetcherRepository: FetcherRepository,
private readonly contractAddress: EthereumAddress,
private readonly logger: Logger,
private readonly reorgOffset = 15n,
private readonly maxBlocksPerQuery = 10_000n,
) {
this.logger = logger.for(this)
}
async start(): Promise<void> {
const fetcher = await this.fetcherRepository.getByChainIdOrDefault(
this.client.getChainId(),
)
this.lastFetchedBlock = fetcher.lastFetchedBlock
this.logger.info('Started', {
lastFetchedBlock: Number(this.lastFetchedBlock),
})
}
getLastFetchedBlock(): bigint {
return this.lastFetchedBlock
}
async getNewBatches(): Promise<Batch[]> {
this.logger.debug('Fetching new events', {
contractAddress: this.contractAddress.toString(),
eventAbi: eventAbi.name,
lastFetchedBlock: this.lastFetchedBlock.toString(),
})
const lastBlock = await this.client.getBlockNumber()
const fromBlock = this.lastFetchedBlock + 1n
const toBlock = BigInt(
Math.min(
Number(lastBlock - this.reorgOffset),
Number(this.lastFetchedBlock + this.maxBlocksPerQuery),
),
)
if (fromBlock > toBlock) {
this.logger.debug('No new events')
return []
}
const events = await this.client.getLogsInRange(
eventAbi,
this.contractAddress,
fromBlock,
toBlock,
)
this.logger.debug('Fetched new events', { length: events.length })
const calldata = await this.eventsToCallData(events)
const timestamps = await this.eventsToTimestamps(events)
const posters = eventsToPosters(events)
assert(
events.length === posters.length && events.length === timestamps.length,
'The amount of calldata is not equal to the amount of poster address or amount of timestamps',
)
this.lastFetchedBlock = toBlock
await this.updateFetcherDatabase()
return zipWith(
posters,
timestamps,
calldata,
(poster, timestamp, calldata) => {
return { poster, timestamp, calldata }
},
)
}
async eventsToCallData(events: BatchAppendedLogsType): Promise<Hex[]> {
const txs = await Promise.all(
events.map((event) => {
return this.client.getTransaction(Hex(event.transactionHash))
}),
)
const decoded = txs.map((tx) => {
const { args } = decodeFunctionData({
abi: abi,
data: tx.input,
})
return Hex(args[0])
})
return decoded
}
async eventsToTimestamps(events: BatchAppendedLogsType): Promise<Date[]> {
const blocks = await Promise.all(
events.map((event) => {
return this.client.getBlockHeader(Hex(event.blockHash))
}),
)
const timestamps = blocks.map((block) => {
return new Date(parseInt(block.timestamp.toString(), 10) * 1000)
})
return timestamps
}
async updateFetcherDatabase(): Promise<void> {
const record: FetcherRecord = {
chainId: this.client.getChainId(),
lastFetchedBlock: this.lastFetchedBlock,
}
await this.fetcherRepository.addOrUpdate(record)
}
}
function eventsToPosters(events: BatchAppendedLogsType): EthereumAddress[] {
return events.map((e) => {
assert(e.args.sender !== undefined, 'Unexepected lack of event sender')
return EthereumAddress(e.args.sender)
})
}