-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfood_delivery.py
163 lines (137 loc) · 5.48 KB
/
food_delivery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import asyncio
import random
from typing import Dict, Any
BENCHMARK_NAME = "Food Delivery"
class FoodDeliverySystem:
def __init__(self) -> None:
self.restaurants: Dict[str, bool] = {}
self.orders: Dict[str, dict[str, Any]] = {}
self.riders: Dict[str, bool] = {}
self._lock = asyncio.Lock()
async def handle_event(self, event: Dict[str, Any]) -> None:
match event.get("type"):
case "restaurant_open":
await self.restaurant_open(event["restaurant_id"])
case "restaurant_closed":
await self.restaurant_closed(event["restaurant_id"])
case "new_order":
await self.new_order(
event["order_id"], event["restaurant_id"], event.get("items", [])
)
case "assign_rider":
await self.assign_rider(event["order_id"], event["rider_id"])
case "order_delivered":
await self.order_delivered(event["order_id"])
case "rider_status":
await self.update_rider_status(event["rider_id"], event["is_available"])
async def restaurant_open(self, restaurant_id: str) -> None:
async with self._lock:
self.restaurants[restaurant_id] = True
async def restaurant_closed(self, restaurant_id: str) -> None:
async with self._lock:
self.restaurants[restaurant_id] = False
async def new_order(
self, order_id: str, restaurant_id: str, items: list[dict[str, Any]]
) -> None:
await asyncio.sleep(0.1)
async with self._lock:
if not self.restaurants.get(restaurant_id, False):
return
self.orders[order_id] = {
"status": "pending",
"restaurant": restaurant_id,
"items": items,
"rider_assigned": None,
}
await self.notify_user(order_id, "Order created and pending assignment.")
async def assign_rider(self, order_id: str, rider_id: str) -> None:
await asyncio.sleep(0.1)
async with self._lock:
order = self.orders.get(order_id)
if not order or order["status"] != "pending":
return
if not self.riders.get(rider_id, False):
return
order["status"] = "in_progress"
order["rider_assigned"] = rider_id
self.riders[rider_id] = False
await asyncio.gather(
self.notify_user(
order_id, f"Order in progress. Rider '{rider_id}' assigned."
),
self.log_event("assign_rider", order_id, rider_id)
)
async def order_delivered(self, order_id: str) -> None:
await asyncio.sleep(0.1)
async with self._lock:
order = self.orders.get(order_id)
if not order:
return
rider_id = order["rider_assigned"]
if rider_id:
self.riders[rider_id] = True
order["status"] = "delivered"
await asyncio.gather(
self.notify_user(order_id, "Order delivered!"),
self.log_event("order_delivered", order_id, rider_id),
)
async def update_rider_status(self, rider_id: str, is_available: bool) -> None:
async with self._lock:
self.riders[rider_id] = is_available
async def notify_user(self, order_id: str, message: str) -> None:
await asyncio.sleep(0.1)
async def log_event(self, action: str, order_id: str, rider_id: str) -> None:
await asyncio.sleep(0.05)
async def generate_events(num_events: int) -> list[dict[str, Any]]:
event_types = [
"restaurant_open",
"restaurant_closed",
"new_order",
"assign_rider",
"order_delivered",
"rider_status",
]
events: list[dict[str, Any]] = []
for i in range(num_events):
event_type = random.choice(event_types)
if event_type == "restaurant_open":
events.append({"type": "restaurant_open", "restaurant_id": f"R{i % 5:03}"})
elif event_type == "restaurant_closed":
events.append(
{"type": "restaurant_closed", "restaurant_id": f"R{i % 5:03}"}
)
elif event_type == "new_order":
events.append(
{
"type": "new_order",
"order_id": f"O{i:03}",
"restaurant_id": f"R{i % 5:03}",
"items": ["item1", "item2"],
}
)
elif event_type == "assign_rider":
events.append(
{
"type": "assign_rider",
"order_id": f"O{i % 10:03}",
"rider_id": f"RD{i % 3:02}",
}
)
elif event_type == "order_delivered":
events.append({"type": "order_delivered", "order_id": f"O{i % 10:03}"})
elif event_type == "rider_status":
events.append(
{
"type": "rider_status",
"rider_id": f"RD{i % 3:02}",
"is_available": random.choice([True, False]),
}
)
return events
async def main(orders: int) -> None:
fds = FoodDeliverySystem()
events = await generate_events(orders)
tasks = [fds.handle_event(evt) for evt in events]
await asyncio.gather(*tasks)
def run(loop: asyncio.AbstractEventLoop, num_producers: int) -> None:
loop.run_until_complete(main(num_producers))