-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcache_token.py
92 lines (76 loc) · 2.54 KB
/
cache_token.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import asyncio
import json
import os
import time
import httpx
from utils.algo import get_sign
from utils.com import match_value_from_cookie, unquote_pt_pin
from utils.proxy import get_proxies
from utils.coroutines import async_print, start
async def get_isv_token(cookie, **kwargs):
"""
:return:
"""
try:
headers = {
'user-agent': 'okhttp/3.12.16;jdmall;android;version/13.1.0;build/99208;',
'content-type': 'application/x-www-form-urlencoded; charset=UTF-8',
'cookie': cookie,
}
resp = get_sign('isvObfuscator', json.dumps({"id": "", "url": "https://lzkj-isv.isvjcloud.com"}))
url = 'https://api.m.jd.com/client.action?functionId=isvObfuscator&' + resp['body']
response = httpx.post(url, headers=headers, proxies=get_proxies())
return response.json().get('token', None)
except Exception as e:
await async_print("获取Token失败, {}".format(e.args))
async def write_token(path, pin, token):
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
try:
with open(path, 'r', encoding='utf-8') as fp:
data = json.load(fp)
except Exception as e:
await async_print(f"打开文件:{path}失败, {e.args}")
data = dict()
data[pin] = {
'expires': int(time.time() * 1000) + 60 * 29 * 1000,
'val': token,
}
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f)
async def save_faker3_token(pin, token):
"""
:param pin:
:param token:
:return:
"""
path = '/ql/data/scripts/shufflewzc_faker3_main/utils/token.json'
await write_token(path, pin, token)
async def save_jdmax_token(pin, token):
"""
:param pin:
:param token:
:return:
"""
path = '/ql/data/scripts/9Rebels_jdmax/utils/token.json'
await write_token(path, pin, token)
async def save_jdm_token(pin, token):
"""
:param pin:
:param token:
:return:
"""
path = '/ql/data/scripts/6dylan6_jdm/utils/token.json'
await write_token(path, pin, token)
async def cache_token(jd_ck, **kwargs):
pin = unquote_pt_pin(match_value_from_cookie(jd_ck))
await async_print("正在获取Token!")
token = await get_isv_token(jd_ck, **kwargs)
if token:
await save_faker3_token(pin, token)
await save_jdmax_token(pin, token)
await save_jdm_token(pin, token)
await async_print("成功获取Token!")
await asyncio.sleep(1)
if __name__ == '__main__':
start(cache_token, '缓存Token', max_concurrent=3)