-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqr_scan.py
144 lines (114 loc) · 5.99 KB
/
qr_scan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from cs50 import SQL
from datetime import datetime
import cv2
from pyzbar.pyzbar import decode
import numpy as np
import time
import json
db = SQL("sqlite:///management.db")
class QRScanner:
def __init__(self):
self.cap = None
self.scanned_data = []
def segregate_timestamp(self, timestamp_str):
timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")
return timestamp.year, timestamp.month, timestamp.day, timestamp.hour, timestamp.minute, timestamp.second
def start_scanner(self, camera_index=0, delay_between_scans=3):
self.cap = cv2.VideoCapture(camera_index)
last_process_time = 0
while True:
_, frame = self.cap.read()
# Convert the frame to grayscale for better QR code detection
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Detect QR codes in the frame
decoded_objects = decode(gray)
for obj in decoded_objects:
# Draw a rectangle around the QR code
points = obj.polygon
if len(points) == 4:
pts = np.array([(point.x, point.y) for point in points], dtype=int)
cv2.polylines(frame, [pts], isClosed=True, color=(0, 255, 0), thickness=2)
# Get the QR code data
details = obj.data.decode('utf-8')
try:
# Try to decode the JSON-formatted string into a Python dictionary
details_dict = json.loads(details)
# Get the current timestamp
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
details_dict["Timestamp"] = timestamp
except json.JSONDecodeError as e:
# Handle the case where the data is not valid JSON
print(f"Error decoding JSON: {e}")
# Check if the specified delay has passed since the last processed QR code
current_time = time.time()
if current_time - last_process_time >= delay_between_scans:
last_process_time = current_time
# Do something with the QR code data and timestamp (e.g., print them)
#print(f"Scanned QR Code: {details_dict}")
# Display the QR code data and timestamp on the frame
font = cv2.FONT_HERSHEY_SIMPLEX
bottom_left_corner = (pts[0][0], pts[0][1] - 10)
font_scale = 0.5
font_color = (255, 255, 255)
line_type = 1
cv2.putText(frame, f"ID: {details_dict} | Time: {timestamp}", bottom_left_corner, font, font_scale, font_color, line_type)
# Append the details to the list
self.scanned_data.append(details_dict)
if self.check_data(details_dict):
self.update_data(details_dict)
self.stop_scanner()
else:
self.write_data(details_dict)
# Segregate the timestamp components
#year, month, day, hour, minute, second = self.segregate_timestamp(timestamp)
#print(f"Year: {year}, Month: {month}, Day: {day}, Hour: {hour}, Minute: {minute}, Second: {second}")
# Display the frame with the rectangles around the QR codes
cv2.imshow("QR Code Scanner", frame)
# Non-blocking wait key
key = cv2.waitKey(1) & 0xFF
if key == 27: # Press 'Esc' to exit
break
def stop_scanner(self):
if self.cap is not None:
self.cap.release()
cv2.destroyAllWindows()
def get_scanned_data(self):
return self.scanned_data
def clear_scanned_data(self):
self.scanned_data = []
def add_revenue(self, data):
product_id = data["Product_ID"]
quantity = int(data["Quantity"])
price = db.execute("SELECT Product_Price FROM products WHERE Product_ID = ?", product_id)
product_price = price[0]["Product_Price"]
total_price = float(product_price) * quantity
year, month, day, hour, minute, second = self.segregate_timestamp(data["Timestamp"])
row = db.execute("SELECT * FROM revenue WHERE YEAR = ? AND MONTH = ?", year, month)
if row:
db.execute("UPDATE revenue SET Total_Revenue = Total_Revenue + ? WHERE YEAR = ? AND MONTH = ?", total_price, year, month)
else:
db.execute("INSERT INTO revenue (YEAR, MONTH, Total_Revenue) VALUES (?, ?, ?)", year, month, total_price)
def check_data(self, data):
col1 = data["Order_ID"]
col2 = data["Customer_ID"]
col3 = data["Product_ID"]
row = db.execute("SELECT Completed FROM jobs WHERE Order_ID = ? AND Customer_ID = ? AND Product_ID = ?", col1, col2, col3)
complete = int(row[0]["Completed"])
total = int(data["Quantity"])
if complete == total:
return True
else:
return False
def update_data(self, data):
col1 = data["Order_ID"]
col2 = data["Customer_ID"]
col3 = data["Product_ID"]
db.execute("UPDATE jobs SET Status = 'COMPLETE' WHERE Order_ID = ? AND Customer_ID = ? AND Product_ID = ?", col1, col2, col3)
db.execute("UPDATE orders SET Status = 'COMPLETE' WHERE Order_ID = ? AND Customer_ID = ? AND Product_ID = ?", col1, col2, col3)
db.execute("DELETE FROM jobs WHERE Order_ID = ? AND Customer_ID = ? AND Product_ID = ?", col1, col2, col3)
self.add_revenue(data)
def write_data(self, data):
col1 = data["Order_ID"]
col2 = data["Customer_ID"]
col3 = data["Product_ID"]
db.execute("UPDATE jobs SET Completed = Completed + 1 WHERE Order_ID = ? AND Customer_ID = ? AND Product_ID = ?", col1, col2, col3)