-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb.py
executable file
·33 lines (30 loc) · 1.13 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from prisma import Prisma
from datetime import datetime
class EventRepository:
def __init__(self, prisma: Prisma):
self.prisma = prisma
async def save_event(self, aixProcessed, aixDistributed, ethProcessed, ethDistributed, timestamp, txhash):
try:
return await self.prisma.event.create({
"aixProcessed": str(aixProcessed),
"aixDistributed": str(aixDistributed),
"ethProcessed": str(ethProcessed),
"ethDistributed": str(ethDistributed),
"timestamp": timestamp,
"txhash": txhash
})
except Exception as e:
print(e)
async def get_all_events_last_24h(self):
current_time = int(datetime.now().timestamp())
try:
return await self.prisma.event.find_many(
where={
'timestamp': {
'gt': current_time - 60*60*24
}
}
)
except Exception as e:
print(e)
return await self.get_all_events_last_24h()