-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjwt_helper.py
90 lines (73 loc) · 2.59 KB
/
jwt_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import os
from datetime import datetime, timedelta, timezone
from functools import wraps
import jwt
from flask import jsonify, request
SECRET_KEY = os.getenv("SECRET_JWT_KEY", "SuperSecretKey")
ACCESS_TOKEN_EXPIRY = timedelta(hours=1)
REFRESH_TOKEN_EXPIRY = timedelta(days=30)
class TokenError(Exception):
"""
Custom exception for token-related errors.
"""
def __init__(self, message, status_code):
super().__init__(message)
self.status_code = status_code
self.message = message
def generate_access_token(player_id: int) -> str:
"""
Generate a short-lived JWT access token for a user.
"""
payload = {
"player_id": player_id,
"exp": datetime.now(timezone.utc) + ACCESS_TOKEN_EXPIRY, # Expiration
"iat": datetime.now(timezone.utc), # Issued at
"token_type": "access",
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
def generate_refresh_token(player_id: int) -> str:
"""
Generate a long-lived refresh token for a user.
"""
payload = {
"player_id": player_id,
"exp": datetime.now(timezone.utc) + REFRESH_TOKEN_EXPIRY,
"iat": datetime.now(timezone.utc),
"token_type": "refresh",
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
def extract_token_from_header() -> str:
"""
Extract the Bearer token from the Authorization header.
"""
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise TokenError("Token is missing or improperly formatted", 401)
return auth_header.split("Bearer ")[1]
def verify_token(token: str, required_type: str) -> dict:
"""
Verify and decode a JWT token.
"""
try:
decoded = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
if decoded.get("token_type") != required_type:
raise jwt.InvalidTokenError("Invalid token type")
return decoded
except jwt.ExpiredSignatureError:
raise TokenError("Token has expired", 401)
except jwt.InvalidTokenError:
raise TokenError("Invalid token", 401)
def token_required(f):
"""
Decorator to protect routes by requiring a valid token.
"""
@wraps(f)
def decorated(*args, **kwargs):
try:
token = extract_token_from_header()
decoded = verify_token(token, required_type="access")
request.player_id = decoded["player_id"]
return f(*args, **kwargs)
except TokenError as e:
return jsonify(message=e.message), e.status_code
return decorated