Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
liangbaika committed Jan 18, 2021
1 parent 09b5a52 commit a2df806
Show file tree
Hide file tree
Showing 19 changed files with 2,482 additions and 306 deletions.
18 changes: 14 additions & 4 deletions Pipfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
[[source]]
name = "pypi"
url = "https://mirrors.163.com/pypi/simple/"
verify_ssl = true
name = "pypi"
url = "https://mirrors.aliyun.com/pypi/simple/"

[dev-packages]
aioredis = "*"
pytest = "*"
mkdocs = "*"
pymysql = "*"
Expand All @@ -12,14 +13,23 @@ pyppeteer = "*"
ruia = "*"
ruia-ua = "*"
requests = "*"
fastapi = "*"
#fastapi = "*"

[packages]
aioredis = "==1.3.1"
aiohttp = "*"
lxml = "*"
uvicorn = {extras = ["standard"],version = "*"}
#uvicorn = {extras = ["standard"],version = "*"}
python-multipart = "*"
jsonpath = "*"
parsel = "*"
cchardet = "*"
aiomysql = "*"
pyppeteer = "*"
redis = "*"
pytest = "*"
blinker = "*"
ruia = "*"

[requires]
python_version = "3.7"
278 changes: 0 additions & 278 deletions Pipfile.lock

This file was deleted.

85 changes: 83 additions & 2 deletions launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from smart.log import log
from smart.pipline import Piplines
from smart.runer import CrawStater
from smart.setting import gloable_setting_dict
from smart.signal import reminder
from smart.spider import Spider
from spiders.db.sanicdb import SanicDB
from spiders.govs import GovsSpider, ArticelItem
from spiders.image_spider import ImageSpider
Expand All @@ -20,12 +23,19 @@

@piplinestest.pipline(1)
async def do_pip(spider_ins, item):
print(f"我是item1111111 {item.results}")
return item


@piplinestest.pipline(2)
def pip2(spider_ins, item):
print(f"我是item2 {item.results}")
print(f"我是item2222222 {item.results}")
return item


@piplinestest.pipline(3)
async def pip3(spider_ins, item):
print(f"我是item33333 {item.results}")
return item


Expand Down Expand Up @@ -57,10 +67,81 @@ def start1():
starter = CrawStater()
starter.run_single(IpSpider(), middlewire=middleware2, pipline=piplinestest)

#
# @reminder.spider_start.connect
# def test1(sender, **kwargs):
# print("spider_start1")
# return 1222222
#
#
# @reminder.spider_start.connect
# def test221(sender, **kwargs):
# print("spider_start2")
# return 33333333
#
#
# @reminder.spider_execption.connect
# def test2(sender, **kwargs):
# print("spider_execption")
#
#
# @reminder.spider_close.connect
# def tes3t(sender, **kwargs):
# print("spider_close")
#
#
# @reminder.engin_start.connect
# def test4(sender, **kwargs):
# print("engin_start")
#
#
# @reminder.engin_idle.connect
# def test5(sender, **kwargs):
# print("engin_idle")
#
#
# @reminder.engin_close.connect
# def test6(sender, **kwargs):
# print("engin_close")
#
#
# @reminder.request_dropped.connect
# def test7(sender, **kwargs):
# print("spider_start")
#
#
# @reminder.request_scheduled.connect
# def test8(sender, **kwargs):
# print("request_scheduled")
#
#
# @reminder.response_received.connect
# def test9(sender, **kwargs):
# print("response_received")
#
#
# @reminder.response_downloaded.connect
# def test10(sender, **kwargs):
# print("response_downloaded")
#
#
# @reminder.item_dropped.connect
# def test11(sender, **kwargs):
# print("spider_start")


if __name__ == '__main__':
starter = CrawStater()
spider1 = GovsSpider()
spider2 = JsonSpider()
js_spider = JsSpider()
starter.run_many([spider1], middlewire=middleware2, pipline=piplinestest)
gloable_setting_dict.update(
duplicate_filter_class="spiders.distributed.RedisBaseDuplicateFilter",
scheduler_container_class="spiders.distributed.RedisSchuler",
pipline_is_paralleled=1
)

spider = IpSpider()
# starter.run_many([spider], middlewire=middleware2, pipline=piplinestest)
starter.run_many([spider])

21 changes: 18 additions & 3 deletions smart/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import asyncio
import importlib
import inspect
import time
import uuid
from asyncio import Lock
from collections import deque
Expand Down Expand Up @@ -105,7 +106,6 @@ async def start(self):
if self.lock and self.lock.locked():
await asyncio.sleep(1)
continue

request_or_item = next(self.iter_request())
if isinstance(request_or_item, Request):
self.scheduler.schedlue(request_or_item)
Expand Down Expand Up @@ -191,12 +191,27 @@ def _check_can_stop(self, request):
return False
if len(self.task_dict) > 0:
return False
if len(self.request_generator_queue) > 0:
if len(self.pip_task_dict) > 0:
return False
if len(self.request_generator_queue) > 0 and self.scheduler.scheduler_container.size() > 0:
return False
if self.downloader.response_queue.qsize() > 0:
return False
if len(self.pip_task_dict) > 0:
if self.scheduler.scheduler_container.size() > 0:
return False
start = time.time()
while 1:
end = time.time()
if (end - start) > 1.0:
print("空转 超过10s 停止")
break
if self.scheduler.scheduler_container.size() <= 0:
time.sleep(0.05)
else:
return False

pass

return True

def _hand_piplines(self, spider_ins, item, index=0):
Expand Down
17 changes: 13 additions & 4 deletions smart/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from smart.response import Response
from smart.scheduler import Scheduler
from smart.setting import gloable_setting_dict
from smart.signal import Reminder
from .request import Request


Expand All @@ -32,6 +33,7 @@ def fetch(self, request: Request) -> Response:
class AioHttpDown(BaseDown):

async def fetch(self, request: Request) -> Response:
print('run')
session = None
resp = None
try:
Expand Down Expand Up @@ -64,8 +66,10 @@ async def fetch(self, request: Request) -> Response:

class Downloader:

def __init__(self, scheduler: Scheduler, middwire: Middleware = None, seq=100, downer: BaseDown = AioHttpDown()):
def __init__(self, scheduler: Scheduler, middwire: Middleware = None, reminder=None, seq=100,
downer: BaseDown = AioHttpDown()):
self.log = log
self.reminder = reminder
self.scheduler = scheduler
self.middwire = middwire
self.response_queue: asyncio.Queue = Queue()
Expand All @@ -92,9 +96,9 @@ async def download(self, request: Request):
ignore_response_codes = spider.cutome_setting_dict.get("ignore_response_codes") or gloable_setting_dict.get(
"ignore_response_codes")
req_delay = spider.cutome_setting_dict.get("req_delay") or gloable_setting_dict.get("req_delay")

if request and request.retry >= max_retry:
# reached max retry times
self.reminder.go(Reminder.request_dropped, request, scheduler=self.scheduler)
self.log.error(f'reached max retry times... {request}')
return
request.retry = request.retry + 1
Expand Down Expand Up @@ -141,18 +145,23 @@ async def download(self, request: Request):
'that is a no-null response, and response must be a '
'smart.Response instance or sub Response instance. ')
return

# self.reminder.go(Reminder.response_downloaded, response)
if response.status not in ignore_response_codes:
await self._after_fetch(request, response)

if response.status not in ignore_response_codes:
response.request = request
response.__spider__ = spider
await self.response_queue.put(response)
return response

def get(self) -> Optional[Response]:
with suppress(QueueEmpty):
return self.response_queue.get_nowait()
response = self.response_queue.get_nowait()
if response:
# self.reminder.go(Reminder.response_received, response)
pass
return response

async def _before_fetch(self, request):
if self.middwire and len(self.middwire.request_middleware) > 0:
Expand Down
1 change: 0 additions & 1 deletion smart/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from typing import Any, Union

from lxml import etree
from ruia.exceptions import InvalidFuncType

from smart.field import BaseField, RegexField, FuncField

Expand Down
2 changes: 1 addition & 1 deletion smart/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def content(self) -> bytes:
def content_type(self) -> Optional[str]:
if self.headers:
for key in self.headers.keys():
if "content_type" == key.lower():
if "content_type" == key.lower() or "content-type" == key.lower():
return self.headers.get(key)
return None

Expand Down
2 changes: 1 addition & 1 deletion smart/runer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from urllib.request import urlopen

from smart.log import log
from smart.core import Engine
from smart.core2 import Engine
from smart.middlewire import Middleware
from smart.pipline import Piplines
from smart.setting import gloable_setting_dict
Expand Down
12 changes: 10 additions & 2 deletions smart/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ def push(self, request: Request):
def pop(self) -> Optional[Request]:
pass

@abstractmethod
def size(self) -> int:
pass


class BaseDuplicateFilter(ABC):

Expand All @@ -47,12 +51,12 @@ def __init__(self):

def add(self, url):
if url:
self.set_container.add(hash(url))
self.set_container.add(url)

def contains(self, url):
if not url:
return False
if hash(url) in self.set_container:
if url in self.set_container:
return True
return False

Expand All @@ -61,6 +65,7 @@ def length(self):


class DequeSchedulerContainer(BaseSchedulerContainer):

def __init__(self):
self.url_queue = deque()

Expand All @@ -72,6 +77,9 @@ def pop(self) -> Optional[Request]:
return self.url_queue.popleft()
return None

def size(self) -> int:
return len(self.url_queue)


class Scheduler:
def __init__(self, duplicate_filter: BaseDuplicateFilter = None,
Expand Down
4 changes: 4 additions & 0 deletions smart/setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
"thread_pool_max_size": 50,
# 根据响应的状态码 忽略以下响应
"ignore_response_codes": [401, 403, 404, 405, 500, 502, 504],
# 是否是分布式爬虫
"is_single": 1,
# pipline之间 处理item 是否并行处理 默认 0 串行 1 并行
"pipline_is_paralleled": 0,
# 启动时网络是否畅通检查地址
"net_healthy_check_url": "https://www.baidu.com",
# log level
Expand Down
39 changes: 39 additions & 0 deletions smart/signal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# -*- coding utf-8 -*-#
# ------------------------------------------------------------------
# Name: signal
# Author: liangbaikai
# Date: 2021/1/15
# Desc: there is a python file description
# ------------------------------------------------------------------
from blinker import Signal


class _Reminder:
spider_start = Signal("spider_start")
spider_execption = Signal("spider_execption")
spider_close = Signal("spider_close")
engin_start = Signal("engin_start")
engin_idle = Signal("engin_idle")
engin_close = Signal("engin_close")
request_dropped = Signal("request_dropped")
request_scheduled = Signal("request_scheduled")
response_received = Signal("response_received")
response_downloaded = Signal("response_downloaded")
item_dropped = Signal("item_dropped")

def __init__(self, *args, **kwargs):
pass

def go(self, signal: Signal, *args, **kwargs):
if signal is None:
raise ValueError("signal can not be null")
has_receivers = bool(signal.receivers)
if has_receivers:
try:
signal.send(*args, **kwargs)
except Exception as e:
pass


Reminder = _Reminder
reminder = Reminder()
Loading

0 comments on commit a2df806

Please sign in to comment.