-
Notifications
You must be signed in to change notification settings - Fork 1
/
tokens.py
188 lines (153 loc) · 6.67 KB
/
tokens.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# tokens.py
# Schwab Token file contains authentication info for schwab api.
# Author: Calvin Seamons
# Last Updated: 24 October, 2024
# Imports
# ------------------ #
import base64
import logging
import os
import requests
import sys
import time
import webbrowser
import yaml
# From Imports
# ------------------ #
from pathlib import Path
# Local File Imports
# ------------------ #
from encryption import decrypt_file_with_password, encrypt_file_with_password
logging.basicConfig(
level=logging.ERROR,
format="\033[91m[ Error ] %(message)s\033[0m"
)
def global_error(message):
logging.error(message)
sys.exit(1)
class Tokens:
def __init__(self, args):
self.base_url = "https://api.schwabapi.com/trader/v1"
self.base_install = args.install_path
self.tokenfile = os.path.join(self.base_install, 'tokens.yaml')
self.credfile = os.path.join(self.base_install, 'schwab-credentials.yaml')
time_experation = self.check_time()
if time_experation == True or args.startup == True:
cred = self.get_app_creds()
self.app_key = cred['app_key']
self.app_secret = cred['app_secret']
self.build_cred()
token_cred = self.get_token_creds()
self.refresh_token = token_cred['refresh_token']
self.access_token = token_cred['access_token']
def get_app_creds(self):
if os.path.isfile(self.credfile):
schwab_cred = decrypt_file_with_password(self.credfile)
if 'app_key' in schwab_cred and schwab_cred['app_key'] is not None and \
schwab_cred['app_key'] != "" and 'app_secret' in schwab_cred and \
schwab_cred['app_secret'] is not None and schwab_cred['app_secret'] != "":
return schwab_cred
else:
global_error("Missing app_key or app_secret in schwab_credentials.yaml. run --startup.")
else:
global_error("schwab-credentials.yaml doesn't exist in ~/.schwab_auto_trader. run --startup.")
def get_token_creds(self):
if os.path.isfile(self.tokenfile):
return decrypt_file_with_password(self.tokenfile)
else:
global_error("tokens.yaml doesn't exist in ~/.schwab_auto_trader. run --startup.")
def check_time(self): # Checks the experation time of the tokenfile.
exp = os.path.getmtime(self.tokenfile)
current_time = time.time()
token_age = current_time-exp
if token_age > 604800: # Your token is most likely expired, refresh needed.
return True
elif token_age > 475200: # If token file hasn't been updated in 5days12hr it is expiring soon.
logging.warning("Your refresh may expire soon. Please run --startup")
return False
else:
# Everything is fine.
return False
def construct_init_auth_url(self) -> tuple[str, str, str]:
try:
data = decrypt_file_with_password(self.credfile)
app_key = data['app_key']
app_secret = data['app_secret']
except Exception as e:
global_error(e)
auth_url = f"https://api.schwabapi.com/v1/oauth/authorize?client_id={app_key}&redirect_uri=https://127.0.0.1"
logging.info("Click to authenticate:")
logging.info(auth_url)
return app_key, app_secret, auth_url
def construct_headers_and_payload(self,returned_url, app_key, app_secret):
response_code = f"{returned_url[returned_url.index('code=') + 5: returned_url.index('%40')]}@"
credentials = f"{app_key}:{app_secret}"
base64_credentials = base64.b64encode(credentials.encode("utf-8")).decode(
"utf-8"
)
headers = {
"Authorization": f"Basic {base64_credentials}",
"Content-Type": "application/x-www-form-urlencoded",
}
payload = {
"grant_type": "authorization_code",
"code": response_code,
"redirect_uri": "https://127.0.0.1",
}
return headers, payload
def retrieve_tokens(self,headers, payload) -> dict:
init_token_response = requests.post(
url="https://api.schwabapi.com/v1/oauth/token",
headers=headers,
data=payload,
)
init_tokens_dict = init_token_response.json()
return init_tokens_dict
def build_cred(self):
app_key, app_secret, cs_auth_url = self.construct_init_auth_url()
webbrowser.open(cs_auth_url)
logging.info("Paste Returned URL:")
returned_url = input("\nPaste Returned URL:")
init_token_headers, init_token_payload = self.construct_headers_and_payload(returned_url, app_key, app_secret)
init_tokens_dict = self.retrieve_tokens(headers=init_token_headers, payload=init_token_payload)
if os.path.isfile(self.tokenfile): #We are resetting tokens so delete if present.
os.remove(self.tokenfile)
with open(self.tokenfile, 'w') as yaml_file:
yaml.dump(init_tokens_dict, yaml_file, default_flow_style=False)
encrypt_file_with_password(self.tokenfile)
def _refresh_token(self):
app_cred = self.get_app_creds() # Retrieve
app_key = app_cred['app_key']
app_secret = app_cred['app_secret']
token_cred = self.get_token_creds()
refresh_cred = token_cred['refresh_token']
payload = {
"grant_type": "refresh_token",
"refresh_token": refresh_cred,
}
headers = {
"Authorization": f'Basic {base64.b64encode(f"{app_key}:{app_secret}".encode()).decode()}',
"Content-Type": "application/x-www-form-urlencoded",
}
refresh_token_response = requests.post(
url="https://api.schwabapi.com/v1/oauth/token",
headers=headers,
data=payload,
)
if refresh_token_response.status_code == 200:
logging.info("Retrieved new tokens successfully using refresh token.")
else:
logging.error(
f"Error refreshing access token: {refresh_token_response.text}"
)
return None
refresh_token_dict = refresh_token_response.json()
if os.path.isfile(self.tokenfile): #We are resetting tokens so delete if present.
os.remove(self.tokenfile)
with open(self.tokenfile, 'w') as yaml_file:
yaml.dump(refresh_token_dict, yaml_file, default_flow_style=False)
encrypt_file_with_password(self.tokenfile)
#logging.debug(refresh_token_dict)
os.environ['secret_access_token'] = refresh_token_dict['access_token']
logging.info("Token dict refreshed.")
return refresh_token_dict