Skip to content

Commit

Permalink
尝试修复短信验证码监听问题 (#72)
Browse files Browse the repository at this point in the history
* 尝试修复内置`listener`监听验证码的问题

* 尝试修复内置监听问题
  • Loading branch information
AntonVanke authored Jun 19, 2021
1 parent a5a605e commit b8e8e61
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 88 deletions.
6 changes: 3 additions & 3 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ def __init__(self):
# 初始化短信验证码配置
if not self.sms_captcha_cfg["is_ocr"]:
if not self.sms_captcha_cfg["jd_wstool"]:
from utils.listener import WebSocket
self.WebSocket = WebSocket()
from utils.listener import SmsSocket
self.sms = SmsSocket()
elif self.sms_captcha_cfg["is_ocr"]:
if self.ocr_cfg["type"] == "":
WARN("当前已开启OCR模式,但是并未选择OCR类型,请在config.yaml补充ocr.type")
Expand Down Expand Up @@ -361,7 +361,7 @@ def main(self):
if self.sms_captcha_cfg["jd_wstool"]:
recv = asyncio.get_event_loop().run_until_complete(ws_conn(ws_conn_url))
else:
recv = self.WebSocket.listener()
recv = self.sms.listener()

if recv == "":
INFO("等待websocket推送短信验证码超时,即将跳过", card["brandName"])
Expand Down
163 changes: 78 additions & 85 deletions utils/listener.py
Original file line number Diff line number Diff line change
@@ -1,124 +1,117 @@
import json
import re
import sys
import time
import threading
import socket
import json
import yaml
import psutil
from socket import AddressFamily
import signal
import socket
from pathlib import Path
from utils.logger import Log

# 日志
logger = Log().logger
# 获取配置
try:
BASE_DIR = Path(__file__).resolve().parent.parent
sms_timeout = int(
yaml.safe_load(open(str(BASE_DIR / "config.yaml"), "r", encoding="utf-8"))["sms_captcha"]["ws_timeout"])
except:
sms_timeout = 45

# 注意事项
_readme = """注意事项:
1. 手机端请求IP地址为监听地址,请先测试是否可以访问通
2. 用手机浏览器测试访问说明1中尝试过的IP地址,如访问通代表无问题
3. 以下IP获取到的IP仅做参考,如果全部访问不通,请检查防火墙开启5201端口或使用ipconfig/ifconfig查看本地其他IP
4. 记得更改手机端的请求地址,并授权软件短信权限和验证码获取权限
5. 访问测试链接后:务必关闭页面,防止浏览器一直后台请求(像:验证码一直是123456)"""


def get_inter_ip():
def timer(time, callback):
"""
查询本机所有ip地址
:return: ip
定时器
参考:@ViolainOt
"""
local_addrs = []
for name, info in psutil.net_if_addrs().items():
for addr in info:
if AddressFamily.AF_INET == addr.family:
if addr.address == "127.0.0.1":
continue
local_addrs.append(addr.address)
return local_addrs

def wrap(func):
def handle(signum, frame):
raise RuntimeError

class MyThread(threading.Thread):
def __init__(self, target, args=()):
"""
因为threading类没有返回值,因此在此处重新定义MyThread类,使线程拥有返回值
"""
super(MyThread, self).__init__()
self.func = target
self.args = args
def to_do(*args, **kwargs):
try:
signal.signal(signal.SIGALRM, handle)
signal.alarm(time)
r = func(*args, **kwargs)
signal.alarm(0)
return r
except RuntimeError as e:
return callback()

def run(self):
# 接受返回值
self.result = self.func(*self.args)
return to_do

def get_result(self):
# 线程不结束,返回值为None
try:
return self.result
except Exception:
return None
return wrap


def limit_decor(timeout, granularity):
def timeout():
"""
timeout 最大允许执行时长, 单位:秒
granularity 轮询间隔,间隔越短结果越精确同时cpu负载越高
return 未超时返回被装饰函数返回值,超时则返回 None
超时返回
"""
return ""

def functions(func):
def run(*args):
thre_func = MyThread(target=func, args=args)
thre_func.setDaemon(True)
thre_func.start()
sleep_num = int(timeout // granularity)
for i in range(0, sleep_num):
infor = thre_func.get_result()
if infor:
return infor
else:
time.sleep(granularity)
return ""

return run

return functions


class WebSocket(object):
def get_inter_ip():
"""
WebSocket服务端
查询本机所有ip地址
:return: ip
"""
local_addrs = []
for name, info in psutil.net_if_addrs().items():
for addr in info:
if socket.AddressFamily.AF_INET == addr.family:
if addr.address == "127.0.0.1":
continue
local_addrs.append(addr.address)
return local_addrs

def __init__(self):
from utils.logger import Log
self.logger = Log().logger
self.tcp_server = None
self.bind()

def bind(self):
class SmsSocket:
@staticmethod
def _readme():
font_color = ["\033[1;36m", "\033[0m"]
print(
f'{font_color[0] if sys.platform != "win32" else ""}{_readme}{font_color[1] if sys.platform != "win32" else ""}')
for idx, val in enumerate(get_inter_ip()):
logger.info(f"监听地址{idx + 1}:\thttp://{val}:5201/publish?smsCode=123456")

def __init__(self):
try:
self.tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.tcp_server.bind(("", 5201))

self.tcp_server.listen(128)
font_color = ["\033[1;36m", "\033[0m"]
print(f"""{font_color[0] if sys.platform != "win32" else ""}注意事项:
1. 手机端请求IP地址为如下监听地址,请先测试是否可以访问通
2. 用手机浏览器测试访问说明1中尝试过的IP地址,如访问通代表无问题
3. 以下IP获取到的IP仅做参考,如果全部访问不通,请检查防火墙开启5201端口或使用ipconfig/ifconfig查看本地其他IP
4. 记得更改手机端的请求地址,并授权软件短信权限和验证码获取权限{font_color[1] if sys.platform != "win32" else ""}
""")
for idx, val in enumerate(get_inter_ip()):
self.logger.info(f"监听地址{idx+1}:\thttp://{val}:5201/publish?smsCode=123456")
except:
self.logger.error("监听失败,请查看是否有同端口脚本")

# 等待时间30 轮询时间0.5
@limit_decor(30, 0.5)
self._readme()
except OSError:
logger.warning("请确保你没有打开另外一个监听脚本或jd_tools")

@timer(sms_timeout, timeout)
def listener(self, *args, **kwargs):
"""
通过 socket 监听
"""
while True:
try:
cs, ca = self.tcp_server.accept()
recv_data = cs.recv(1024)
try:
a = str(re.search(r'smsCode=(\d+)', str(recv_data)).group(1))
self.logger.info(f'监听到京东验证码:\t{a}')
logger.info(f'监听到京东验证码:\t{a}')

return json.dumps({"sms_code": a})
except AttributeError:
self.logger.warning(f"监听到IP: {ca[0]}访问,但未获取到短信验证码")
except:
return ""
logger.warning(f"监听到IP: \t{ca[0]}\t访问,但未获取到短信验证码")
except OSError:
logger.warning("请确保你没有打开另外一个监听脚本或jd_tools")


if __name__ == '__main__':
a = SmsSocket()
while True:
print(WebSocket().listener())
print(a.listener())

0 comments on commit b8e8e61

Please sign in to comment.