-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathuniswap_lp_listener.py
160 lines (129 loc) · 6.88 KB
/
uniswap_lp_listener.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# Necessary imports
from asyncio import get_event_loop, gather, sleep
from json import load, loads, dump
from web3 import Web3
import boto3
from tempfile import NamedTemporaryFile
from typing import List, Dict, Tuple
# Configuring AWS S3 bucket
s3 = boto3.resource(
service_name='s3',
region_name='us-east-2',
aws_access_key_id='<ADD_ACCESS_KEY>',
aws_secret_access_key='<ADD_SECRET_KEY>'
)
s3_bucket = "<ADD_BUCKET_NAME>"
# Initialize QuickNode Endpoint
eth = "<ADD_ETH_RPC_NODE_URL_INFURA_OR_QUICKNODE_OR_SOMETHING_ELSE>/"
# Instantiate Web3 object with quicknode endpoint
web3 = Web3(Web3.HTTPProvider(eth))
# Read necessary ABIs - dealing with 2 mainly
# One corresponds to LPs on Uniswap
# Other corresponds to ERC20 tokens that make up the LPs on Uniswap
with open("contracts/LP.json") as f:
ABI = load(f)
with open("contracts/ERC20.json") as f:
ERC20_ABI = load(f)
def convert_to_checksum(contracts:List) -> List:
"""
Function to convert list of contract addresses to checksum addresses.
Sometimes, addresses might be in all lower case and not in the right,
this function fixes such addresses
:param contracts : list of contract addresses that need to be changed toChecksum
:type contract : List
:returns : List of contract addresses with address in checksum format
:rtype : List
"""
for i in range(0, len(contracts)):
# Using inbuilt function to convert non-checksum address to checksum
contracts[i] = web3.toChecksumAddress(contracts[i])
return contracts
def track_swaps(contracts:List) -> None:
"""
Function to track swaps, does not return anything - acts as a route to get to processing_loop
where all "Swap" events emitted from LPs are listened to. This function acts as a way to initialize
the event loop
:param contracts : List of contracts that need to be listened to
:type contracts : List
"""
# List to store all filters being created
filters = []
# Safety check to convert all addresses to checksum
contracts = convert_to_checksum(contracts)
for contract in contracts:
# Initialising contract object for every contract in order to interact with and listen to contract
contract = web3.eth.contract(address=contract, abi=ABI)
# Create filter to listen to swap events from the latest blocks
filters.append(contract.events.Swap.createFilter(fromBlock='latest'))
loop = get_event_loop()
try:
# Running loop until completion of processing_loop() function
loop.run_until_complete(
gather(
processing_loop(filters)))
finally:
# close loop to free up system resources
loop.close()
async def processing_loop(event_filters:List) -> None:
"""
Function that constantly listens to all swap events, processes
them and pushes the relevant transaction data to s3 buckets. This function
does not return anything.
:param event_filters : List of web3 event filters that listens for "Swap" events on
multiple Uniswap LP contracts. Each event filter in the list
corresponds to one contract
:type event_filters : List
"""
# Runs forever
while True:
# Running loop for every contract for which event filter
# has been created in order to listen to every contract
# for swap events
for event_filter in event_filters:
# Runs accurately most times but faces some issues sometimes - TimeOuts and/or if local machine shuts down
try:
for Swap in event_filter.get_new_entries():
# Convert eveny swap event's emitted details to json format
event_json = Web3.toJSON(Swap)
event_json = loads(event_json)
# Get pool contract in order to get tokens associated with LP
pool_contract = event_json["address"]
# Create pool contract object to interact with associated LP's contract
pool_contract = web3.eth.contract(address=pool_contract, abi=ABI)
# Get address of the 2 ERC20's that make up the LP
token0_addr = pool_contract.functions.token0().call()
token1_addr = pool_contract.functions.token1().call()
# Create 2 separate contract objects for each of the ERC20 tokens to interact with them to get necessary data
token0_contract = web3.eth.contract(address=token0_addr, abi=ERC20_ABI)
token1_contract = web3.eth.contract(address=token1_addr, abi=ERC20_ABI)
# Gather symbol and token names for both ERC20 tokens
token0 = token0_contract.functions.symbol().call()
token0_name = token0_contract.functions.name().call()
token1 = token1_contract.functions.symbol().call()
token1_name = token1_contract.functions.name().call()
# Store data in the already existing variable that has swap based info
event_json["token0"] = token0 + "-----" + token0_name
event_json["token1"] = token1 + "-----" + token1_name
event_json["pool"] = token0 + "-" + token1
# Create a temporary file to dump data into in order to push to S3
temp = NamedTemporaryFile(mode="w+")
dump(event_json, temp)
temp.flush()
# Push data to s3 bucket where the file name is <transactionHash>.json
# This aspect could be done in a better fashion to reduce time to read data
# from s3 buckets. Instead of streaming processing and pushing data immediately after every
# transaction, a batch function could be written where after every 10-20 transactions, data is
# pushed to s3 bucket. This would reduce time of reading data from s3 drastically. Not done now
# due to time contraints.
s3.Bucket(s3_bucket).upload_file(Filename=temp.name, Key=event_json["transactionHash"]+".json")
# Delete temporary file
temp.close()
# Print transaction data to stdout - helps keep track of information regularly as well.
print(event_json, "\n------------")
# Give a small break after every event - increasing time here could help reduce timeouts
await sleep(0.01)
except ValueError:
#In case event not found - happens in some cases
print("Event Not Not Found or if local system shuts down --- ValueError: {'code': -32000, 'message': 'filter not found'}")
continue
await sleep(0.01)