diff --git a/launcher.py b/launcher.py index 4c36291..908095d 100644 --- a/launcher.py +++ b/launcher.py @@ -4,6 +4,8 @@ import time from multiprocessing.pool import Pool +from blinker import Signal + from smart.log import log from smart.pipline import Piplines from smart.runer import CrawStater @@ -23,13 +25,15 @@ @piplinestest.pipline(1) async def do_pip(spider_ins, item): + await asyncio.sleep(1) print(f"我是item1111111 {item.results}") return item @piplinestest.pipline(2) -def pip2(spider_ins, item): +async def pip2(spider_ins, item): print(f"我是item2222222 {item.results}") + await asyncio.sleep(0.5) return item @@ -136,6 +140,6 @@ def test11(sender, **kwargs): spider2 = JsonSpider() js_spider = JsSpider() spider = IpSpider() + spider22 = IpSpider() starter.run_many([spider], middlewire=middleware2, pipline=piplinestest) # starter.run_many([spider]) - diff --git a/launcher_ds.py b/launcher_ds.py index 96991f1..6f92006 100644 --- a/launcher_ds.py +++ b/launcher_ds.py @@ -28,7 +28,7 @@ async def do_pip(spider_ins, item): @piplinestest.pipline(2) -def pip2(spider_ins, item): +async def pip2(spider_ins, item): print(f"我是item2222222 {item.results}") return item @@ -138,10 +138,10 @@ def xxx(sender, **kwargs): gloable_setting_dict.update( duplicate_filter_class="spiders.distributed.RedisBaseDuplicateFilter", scheduler_container_class="spiders.distributed.RedisSchuler", - pipline_is_paralleled=1 + pipline_is_paralleled=1, + is_single=0, ) spider = IpSpider() starter.run_many([spider], middlewire=middleware2, pipline=piplinestest) # starter.run_many([spider]) - diff --git a/smart/buffer/__init__.py b/smart/buffer/__init__.py new file mode 100644 index 0000000..ef4c551 --- /dev/null +++ b/smart/buffer/__init__.py @@ -0,0 +1,8 @@ +# -*- coding utf-8 -*-# +# ------------------------------------------------------------------ +# Name: __init__.py +# Author: liangbaikai +# Date: 2021/1/18 +# Desc: there is a python file description +# ------------------------------------------------------------------ + diff --git a/smart/buffer/request_buffer.py b/smart/buffer/request_buffer.py new file mode 100644 index 0000000..b83f5a1 --- /dev/null +++ b/smart/buffer/request_buffer.py @@ -0,0 +1,128 @@ +# -*- coding utf-8 -*-# +# ------------------------------------------------------------------ +# Name: request_buffer +# Author: liangbaikai +# Date: 2021/1/18 +# Desc: there is a python file description +# ------------------------------------------------------------------ +# -*- coding: utf-8 -*- +import collections +import threading +import time + +MAX_URL_COUNT = 100 # 缓存中最大request数 + + +class Singleton(object): + def __new__(cls, *args, **kwargs): + if not hasattr(cls, "_inst"): + cls._inst = super(Singleton, cls).__new__(cls) + return cls._inst + + +class RequestBuffer(threading.Thread, Singleton): + dedup = None + + def __init__(self, table_folder): + if not hasattr(self, "_requests_deque"): + super(RequestBuffer, self).__init__() + + self._thread_stop = False + self._is_adding_to_db = False + + self._requests_deque = collections.deque() + self._del_requests_deque = collections.deque() + self._db = RedisDB() + + def run(self): + while not self._thread_stop: + try: + self.__add_request_to_db() + except Exception as e: + print(e) + time.sleep(5) + + def stop(self): + self._thread_stop = True + + def put_request(self, request): + self._requests_deque.append(request) + + if self.get_requests_count() > MAX_URL_COUNT: # 超过最大缓存,主动调用 + self.flush() + + def put_del_request(self, request): + self._del_requests_deque.append(request) + + def flush(self): + try: + self.__add_request_to_db() + except Exception as e: + print(e) + + def get_requests_count(self): + return len(self._requests_deque) + + def is_adding_to_db(self): + return self._is_adding_to_db + + def __add_request_to_db(self): + request_list = [] + prioritys = [] + callbacks = [] + + while self._requests_deque: + request = self._requests_deque.popleft() + self._is_adding_to_db = True + + if callable(request): + # 函数 + # 注意:应该考虑闭包情况。闭包情况可写成 + # def test(xxx = xxx): + # # TODO 业务逻辑 使用 xxx + # 这么写不会导致xxx为循环结束后的最后一个值 + callbacks.append(request) + continue + + priority = request.priority + + # 如果需要去重并且库中已重复 则continue + if ( + request.filter_repeat + and setting.REQUEST_FILTER_ENABLE + and not self.__class__.dedup.add(request.fingerprint) + ): + continue + else: + request_list.append(str(request.to_dict)) + prioritys.append(priority) + + if len(request_list) > MAX_URL_COUNT: + self._db.zadd(self._table_request, request_list, prioritys) + request_list = [] + prioritys = [] + + # 入库 + if request_list: + self._db.zadd(self._table_request, request_list, prioritys) + + # 执行回调 + for callback in callbacks: + try: + callback() + except Exception as e: + log.exception(e) + + # 删除已做任务 + if self._del_requests_deque: + request_done_list = [] + while self._del_requests_deque: + request_done_list.append(self._del_requests_deque.popleft()) + + # 去掉request_list中的requests, 否则可能会将刚添加的request删除 + request_done_list = list(set(request_done_list) - set(request_list)) + + if request_done_list: + self._db.zrem(self._table_request, request_done_list) + + self._is_adding_to_db = False diff --git a/smart/core.py b/smart/core.py index d066836..123e3d2 100644 --- a/smart/core.py +++ b/smart/core.py @@ -21,6 +21,7 @@ from smart.request import Request from smart.scheduler import Scheduler from smart.setting import gloable_setting_dict +from smart.signal import reminder class Engine: @@ -31,13 +32,14 @@ def __init__(self, spider, middlewire=None, pipline: Piplines = None): self.spider = spider self.middlewire = middlewire self.piplines = pipline + self.reminder = reminder duplicate_filter_class = self._get_dynamic_class_setting("duplicate_filter_class") scheduler_container_class = self._get_dynamic_class_setting("scheduler_container_class") net_download_class = self._get_dynamic_class_setting("net_download_class") self.scheduler = Scheduler(duplicate_filter_class(), scheduler_container_class()) req_per_concurrent = self.spider.cutome_setting_dict.get("req_per_concurrent") or gloable_setting_dict.get( "req_per_concurrent") - self.downloader = Downloader(self.scheduler, self.middlewire, seq=req_per_concurrent, + self.downloader = Downloader(self.scheduler, self.middlewire, seq=req_per_concurrent,reminder=self.reminder, downer=net_download_class()) self.request_generator_queue = deque() self.stop = False diff --git a/smart/core2.py b/smart/core2.py index c9fb1e7..40b701c 100644 --- a/smart/core2.py +++ b/smart/core2.py @@ -21,6 +21,7 @@ from smart.item import Item from smart.pipline import Piplines from smart.request import Request +from smart.response import Response from smart.scheduler import Scheduler, DequeSchedulerContainer from smart.setting import gloable_setting_dict from smart.signal import reminder, Reminder @@ -62,7 +63,7 @@ def _get_dynamic_class_setting(self, key): key) _module = importlib.import_module(".".join(class_str.split(".")[:-1])) _class = getattr(_module, class_str.split(".")[-1]) - self.log.info("_class:"+_class.__name__) + self.log.info("_class:" + _class.__name__) return _class def iter_request(self): @@ -125,6 +126,10 @@ async def start(self): if self.lock and self.lock.locked(): await asyncio.sleep(0.5) continue + if self.is_single: + if len(self.task_dict) > 1500: + await asyncio.sleep(0.3) + request_or_item = next(self.iter_request()) if isinstance(request_or_item, Request): @@ -141,8 +146,8 @@ async def start(self): f" here is no request and the task has been completed.so engine will stop ..") self.stop = True break - if request_or_item is None: - await asyncio.sleep(0.04) + # if request_or_item is None: + await asyncio.sleep(0.001) if self.spider.state != "runing": self.spider.state = "runing" @@ -159,11 +164,9 @@ async def start(self): async def work(self): while not self.stop: - if not self.is_single: - if len(self.task_dict) > 2000: - await asyncio.sleep(0.03) - if not self.is_single and self.scheduler.scheduler_container.size() <= 10: - await asyncio.sleep(0.06) + # if not self.is_single: + # if len(self.task_dict) > 2000: + # await asyncio.sleep(0.03) request = self.scheduler.get() if isinstance(request, Request): setattr(request, "__spider__", self.spider) @@ -178,12 +181,7 @@ async def work(self): # let the_downloader can be scheduled, test 0.001-0.0006 is better # uniform = random.uniform(0.0001, 0.006) # await asyncio.sleep(0.0005) - if not self.is_single: - # 分布式爬虫 此处可以把更多的任务放在共享队列里 - await asyncio.sleep(0.017) - else: - # 单机的话 拼命调度 - await asyncio.sleep(0.001) + await asyncio.sleep(0.001) continue custome_callback = resp.request.callback if custome_callback: @@ -232,6 +230,7 @@ def _ensure_future(self, request: Request): self.task_dict[key] = task task.add_done_callback(self._check_complete_callback) + def _handle_exception(self, spider, e): if spider: try: diff --git a/smart/core3.py b/smart/core3.py new file mode 100644 index 0000000..e8c5565 --- /dev/null +++ b/smart/core3.py @@ -0,0 +1,256 @@ +# -*- coding utf-8 -*-# +# ------------------------------------------------------------------ +# Name: core +# Author: liangbaikai +# Date: 2020/12/22 +# Desc: there is a python file description +# ------------------------------------------------------------------ +import asyncio +import importlib +import inspect +import time +import uuid +from asyncio import Lock +from collections import deque +from typing import Dict + +from smart.log import log +from smart.downloader import Downloader +from smart.item import Item +from smart.pipline import Piplines +from smart.request import Request +from smart.scheduler import Scheduler +from smart.setting import gloable_setting_dict +from smart.signal import reminder, Reminder + + +class Engine: + def __init__(self, spider, middlewire=None, pipline: Piplines = None): + self.lock = None + self.reminder = reminder + self.task_dict: Dict[str, asyncio.Task] = {} + self.pip_task_dict: Dict[str, asyncio.Task] = {} + self.spider = spider + self.middlewire = middlewire + self.piplines = pipline + duplicate_filter_class = self._get_dynamic_class_setting("duplicate_filter_class") + scheduler_container_class = self._get_dynamic_class_setting("scheduler_container_class") + net_download_class = self._get_dynamic_class_setting("net_download_class") + self.scheduler = Scheduler(duplicate_filter_class(), scheduler_container_class()) + req_per_concurrent = self.spider.cutome_setting_dict.get("req_per_concurrent") or gloable_setting_dict.get( + "req_per_concurrent") + self.downloader = Downloader(self.scheduler, self.middlewire, seq=req_per_concurrent, + reminder=self.reminder, + downer=net_download_class()) + self.request_generator_queue = deque() + self.stop = False + single = self.spider.cutome_setting_dict.get("is_single") + self.is_single = gloable_setting_dict.get("is_single") if single is None else single + + pipline_is_paralleled = self.spider.cutome_setting_dict.get("pipline_is_paralleled") + pipline_is_paralleled = gloable_setting_dict.get( + "pipline_is_paralleled") if pipline_is_paralleled is None else pipline_is_paralleled + self.pipline_is_paralleled = pipline_is_paralleled + self.log = log + + def _get_dynamic_class_setting(self, key): + class_str = self.spider.cutome_setting_dict.get( + key) or gloable_setting_dict.get( + key) + _module = importlib.import_module(".".join(class_str.split(".")[:-1])) + _class = getattr(_module, class_str.split(".")[-1]) + return _class + + def iter_request(self): + while True: + if not self.request_generator_queue: + yield None + continue + request_generator = self.request_generator_queue[0] + spider, real_request_generator = request_generator[0], request_generator[1] + try: + # execute and get a request from cutomer code + # request=real_request_generator.send(None) + request_or_item = next(real_request_generator) + except StopIteration: + self.request_generator_queue.popleft() + continue + except Exception as e: + # 可以处理异常 + self.request_generator_queue.popleft() + self._handle_exception(spider, e) + continue + yield request_or_item + + def _check_complete_pip(self, task): + if task.cancelled(): + self.log.debug(f" a task canceld ") + return + if task and task.done() and task._key: + if task.exception(): + self.log.error(f"a task occurer error in pipline {task.exception()} ") + else: + self.log.debug(f"a task done ") + result = task.result() + if result and isinstance(result, Item): + if hasattr(task, '_index'): + self._hand_piplines(task._spider, result, task._index + 1) + self.pip_task_dict.pop(task._key) + + def _check_complete_callback(self, task): + if task.cancelled(): + self.log.debug(f" a task canceld ") + return + if task and task.done() and task._key: + self.log.debug(f"a task done ") + self.task_dict.pop(task._key) + + async def start(self): + self.spider.on_start() + self.request_generator_queue.append((self.spider, iter(self.spider))) + + self.reminder.go(Reminder.spider_start, self.spider) + self.reminder.go(Reminder.engin_start, self) + # core implenment + while not self.stop: + # paused + if self.lock and self.lock.locked(): + await asyncio.sleep(1) + continue + if not self.is_single: + if len(self.task_dict) > 2000: + await asyncio.sleep(0.5) + + request_or_item = next(self.iter_request()) + if isinstance(request_or_item, Request): + self.scheduler.schedlue(request_or_item) + + if isinstance(request_or_item, Item): + self._hand_piplines(self.spider, request_or_item) + + request = self.scheduler.get() + + if isinstance(request, Request): + request.__spider__ = self.spider + self._ensure_future(request) + + can_stop = self._check_can_stop(request) + if can_stop: + # there is no request and the task has been completed.so ended + self.log.debug( + f" here is no request and the task has been completed.so engine will stop ..") + self.stop = True + break + + resp = self.downloader.get() + + if resp is None: + # let the_downloader can be scheduled, test 0.001-0.0006 is better + await asyncio.sleep(0.001) + continue + + custome_callback = resp.request.callback + if custome_callback: + request_generator = custome_callback(resp) + if request_generator: + self.request_generator_queue.append((custome_callback.__self__, request_generator)) + # self.request_generator_queue.append( request_generator) + if self.spider.state != "runing": + self.spider.state = "runing" + + self.spider.state = "closed" + self.reminder.go(Reminder.spider_close, self.spider) + self.spider.on_close() + # wait some resource to freed + self.reminder.go(Reminder.engin_close, self.spider) + await asyncio.sleep(0.3) + self.log.debug(f" engine stoped..") + + def pause(self): + self.log.info(f" out called pause.. so engine will pause.. ") + asyncio.create_task(self._lock()) + self.spider.state = "pause" + + def recover(self): + if self.lock and self.lock.locked(): + self.log.info(f" out called recover.. so engine will recover.. ") + self.lock.release() + + def close(self): + # can make external active end engine + self.stop = True + tasks = asyncio.all_tasks() + for it in tasks: + it.cancel() + asyncio.gather(*tasks, return_exceptions=True) + self.log.debug(f" out called stop.. so engine close.. ") + + async def _lock(self): + if self.lock is None: + self.lock = Lock() + await self.lock.acquire() + + def _ensure_future(self, request: Request): + # compatible py_3.6 + task = asyncio.ensure_future(self.downloader.download(request)) + key = str(uuid.uuid4()) + task._key = key + self.task_dict[key] = task + task.add_done_callback(self._check_complete_callback) + + def _handle_exception(self, spider, e): + if spider: + try: + self.log.error(f" occured exceptyion e {e} ", exc_info=True) + spider.on_exception_occured(e) + except BaseException: + pass + + def _check_can_stop(self, request): + if request: + return False + if len(self.task_dict) > 0: + return False + if len(self.pip_task_dict) > 0: + return False + if len(self.request_generator_queue) > 0 and self.scheduler.scheduler_container.size() > 0: + return False + if self.downloader.response_queue.qsize() > 0: + return False + if self.scheduler.scheduler_container.size() > 0: + return False + start = time.time() + while not self.is_single: + end = time.time() + if (end - start) > 10.0: + print("空转 超过10s 停止") + break + if self.scheduler.scheduler_container.size() <= 0: + time.sleep(0.05) + else: + return False + return True + + def _hand_piplines(self, spider_ins, item, index=0): + if self.piplines is None or len(self.piplines.piplines) <= 0: + self.log.info("get a item but can not find a piplinse to handle it so ignore it ") + return + + if len(self.piplines.piplines) < index + 1: + return + + pip = self.piplines.piplines[index][1] + + if not callable(pip): + return + + if not inspect.iscoroutinefunction(pip): + task = asyncio.get_running_loop().run_in_executor(None, pip, spider_ins, item) + else: + task = asyncio.ensure_future(pip(spider_ins, item)) + key = str(uuid.uuid4()) + task._key = key + task._index = index + task._spider = spider_ins + self.pip_task_dict[key] = task + task.add_done_callback(self._check_complete_pip) diff --git a/smart/core4.py b/smart/core4.py new file mode 100644 index 0000000..29c1f56 --- /dev/null +++ b/smart/core4.py @@ -0,0 +1,326 @@ +# -*- coding utf-8 -*-# +# ------------------------------------------------------------------ +# Name: core +# Author: liangbaikai +# Date: 2020/12/22 +# Desc: there is a python file description +# ------------------------------------------------------------------ +import asyncio +import importlib +import inspect +import random +import time +import uuid +from asyncio import Lock, QueueEmpty +from collections import deque +from contextlib import suppress +from typing import Dict, Any + +from smart.log import log +from smart.downloader import Downloader +from smart.item import Item +from smart.pipline import Piplines +from smart.request import Request +from smart.response import Response +from smart.scheduler import Scheduler, DequeSchedulerContainer, AsyncScheduler +from smart.setting import gloable_setting_dict +from smart.signal import reminder, Reminder + + +class Engine: + def __init__(self, spider, middlewire=None, pipline: Piplines = None): + self.reminder = reminder + self.log = log + self.lock = None + self.task_dict: Dict[str, Any] = {} + self.pip_task_dict: Dict[str, asyncio.Task] = {} + self.spider = spider + self.middlewire = middlewire + self.piplines = pipline + duplicate_filter_class = self._get_dynamic_class_setting("duplicate_filter_class") + scheduler_container_class = self._get_dynamic_class_setting("scheduler_container_class") + net_download_class = self._get_dynamic_class_setting("net_download_class") + scheduler_class = self._get_dynamic_class_setting("scheduler_class") + self.scheduler = scheduler_class(duplicate_filter_class(), scheduler_container_class()) + req_per_concurrent = self.spider.cutome_setting_dict.get("req_per_concurrent") or gloable_setting_dict.get( + "req_per_concurrent") + single = self.spider.cutome_setting_dict.get("is_single") + self.is_single = gloable_setting_dict.get("is_single") if single is None else single + self.downloader = Downloader(self.scheduler, self.middlewire, reminder=self.reminder, + seq=req_per_concurrent, + downer=net_download_class()) + self.request_generator_queue = deque() + self.stop = False + self.condition = asyncio.Condition() + self.item_queue = asyncio.Queue() + pipline_is_paralleled = self.spider.cutome_setting_dict.get("pipline_is_paralleled") + pipline_is_paralleled = gloable_setting_dict.get( + "pipline_is_paralleled") if pipline_is_paralleled is None else pipline_is_paralleled + self.pipline_is_paralleled = pipline_is_paralleled + + def _get_dynamic_class_setting(self, key): + class_str = self.spider.cutome_setting_dict.get( + key) or gloable_setting_dict.get( + key) + _module = importlib.import_module(".".join(class_str.split(".")[:-1])) + _class = getattr(_module, class_str.split(".")[-1]) + self.log.info("_class:" + _class.__name__) + return _class + + def iter_request(self): + while True: + if not self.request_generator_queue: + yield None + continue + request_generator = self.request_generator_queue[0] + spider, real_request_generator = request_generator[0], request_generator[1] + try: + request_or_item = next(real_request_generator) + except StopIteration: + self.request_generator_queue.popleft() + continue + except Exception as e: + # 可以处理异常 + self.request_generator_queue.popleft() + self._handle_exception(spider, e) + self.reminder.go(Reminder.spider_execption, spider, exception=e) + continue + yield request_or_item + + def _check_complete_pip(self, task): + if task.cancelled(): + self.log.debug(f" a task canceld ") + return + if task and task.done() and task._key: + if task.exception(): + self.log.error(f"a task occurer error in pipline {task.exception()} ") + else: + self.log.debug(f"a task done ") + result = task.result() + if result and isinstance(result, Item): + if hasattr(task, '_index'): + self._hand_piplines(task._spider, result, index=task._index + 1, paralleled=False) + self.pip_task_dict.pop(task._key) + + def _check_complete_callback(self, task): + if task.cancelled(): + self.log.debug(f" a task canceld ") + return + if task and task.done() and task._key: + self.log.debug(f"a task done ") + self.task_dict.pop(task._key) + + async def start(self): + self.spider.on_start() + self.reminder.go(Reminder.spider_start, self.spider) + self.reminder.go(Reminder.engin_start, self) + + self.request_generator_queue.append((self.spider, iter(self.spider))) + handle_items = [asyncio.ensure_future(self.handle_item()) for _ in range(50)] + works = [asyncio.ensure_future(self.work()) for _ in range(50)] + while not self.stop: + # 是否暂停了 + if self.lock and self.lock.locked(): + await asyncio.sleep(0.5) + continue + # 若是分布式爬虫 让内存里的任务不过堆积过多 尽量均分给其他机器 + if self.is_single: + if len(self.task_dict) > 1500: + await asyncio.sleep(0.3) + waited, wait = False, None + user_func_res = next(self.iter_request()) + if isinstance(user_func_res, Request): + wait = self.scheduler.schedlue(user_func_res) + + if isinstance(user_func_res, (dict, Item)): + wait = self.item_queue.put(user_func_res) + if wait is not None and inspect.isawaitable(wait): + waited = True + await wait + + # if not wait: + # waited = True + # await asyncio.sleep(0.1) + + await asyncio.sleep(0.001) + if self._check_can_stop(None): + # there is no request and the task has been completed.so ended + self.log.debug(" here is no request and the task has been completed.so engine will stop ..") + self.stop = True + break + if self.spider.state != "runing": + self.spider.state = "runing" + + if not waited: + await asyncio.sleep(0.02) + + self.spider.state = "closed" + self.reminder.go(Reminder.spider_close, self.spider) + self.spider.on_close() + for _t in works + handle_items: + _t.cancel() + self.reminder.go(Reminder.engin_close, self) + self.log.debug(f" engine stoped..") + + async def work(self): + seed = False + while not self.stop: + waited = False + request = self.scheduler.get() + if inspect.isawaitable(request): + waited = True + request = await request + resp = None + if isinstance(request, Request): + setattr(request, "__spider__", self.spider) + self.reminder.go(Reminder.request_scheduled, request) + if waited: + resp = await self._ensure_future_special(request) + else: + self._ensure_future(request) + _resp = self.downloader.get() + if not resp: + resp = _resp + + if resp is None: + if not waited: + await asyncio.sleep(0.1) + # let the_downloader can be scheduled, test 0.001-0.0006 is better + # uniform = random.uniform(0.0001, 0.006) + if not seed: + await asyncio.sleep(0.03) + seed = not seed + continue + custome_callback = resp.request.callback + if custome_callback: + request_generator = custome_callback(resp) + if request_generator: + self.request_generator_queue.append((custome_callback.__self__, request_generator)) + + @staticmethod + async def cancel_all_tasks(): + tasks = [] + for task in asyncio.Task.all_tasks(): + if task is not asyncio.tasks.Task.current_task(): + tasks.append(task) + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + + def pause(self): + # self.log.info(f" out called pause.. so engine will pause.. ") + asyncio.create_task(self._lock()) + self.spider.state = "pause" + + def recover(self): + if self.lock and self.lock.locked(): + # self.log.info(f" out called recover.. so engine will recover.. ") + self.lock.release() + + def close(self): + # can make external active end engine + self.stop = True + tasks = asyncio.all_tasks() + for it in tasks: + it.cancel() + asyncio.gather(*tasks, return_exceptions=True) + self.log.debug(f" out called stop.. so engine close.. ") + + async def _lock(self): + if self.lock is None: + self.lock = Lock() + await self.lock.acquire() + + def _ensure_future(self, request: Request): + # compatible py_3.6 + task = asyncio.ensure_future(self.downloader.download(request)) + key = str(uuid.uuid4()) + task._key = key + self.task_dict[key] = task + task.add_done_callback(self._check_complete_callback) + + async def _ensure_future_special(self, request: Request): + # compatible py_3.6 + key = str(uuid.uuid4()) + self.task_dict[key] = request + resp = await self.downloader.download(request) + self.task_dict.pop(key) + return resp + + def _handle_exception(self, spider, e): + if spider: + try: + self.log.error(f" occured exceptyion e {e} ", exc_info=True) + spider.on_exception_occured(e) + except BaseException: + pass + + def _check_can_stop(self, request): + if request: + return False + if len(self.task_dict) > 0: + return False + if len(self.pip_task_dict) > 0: + return False + if len(self.request_generator_queue) > 0: + return False + if self.item_queue.qsize() > 0: + return False + if self.downloader.response_queue.qsize() > 0: + return False + if len(self.request_generator_queue) > 0 and self.scheduler.scheduler_container.size() > 0: + return False + if self.scheduler.scheduler_container.size() > 0: + return False + start = time.time() + self.reminder.go(Reminder.engin_idle, self) + while not self.is_single: + end = time.time() + period = 10 + if (end - start) > period: + self.log.info(f"empty loop {period}s second so stop") + break + if self.scheduler.scheduler_container.size() <= 0: + time.sleep(0.1) + else: + return False + self.log.info("craw end ,engin will stop") + return True + + def _run_pip_async(self, pip, spider_ins, item, index, paralleled=False): + if not inspect.iscoroutinefunction(pip): + task = asyncio.get_running_loop().run_in_executor(None, pip, spider_ins, item) + else: + task = asyncio.ensure_future(pip(spider_ins, item)) + key = str(uuid.uuid4()) + task._key = key + if not paralleled: + task._index = index + task._spider = spider_ins + self.pip_task_dict[key] = task + task.add_done_callback(self._check_complete_pip) + + def _hand_piplines(self, spider_ins, item, index=0, paralleled=False): + if self.piplines is None or len(self.piplines.piplines) <= 0: + self.log.debug("get a item but can not find a piplinse to handle it so ignore it ") + return + if paralleled: + for order, pip in self.piplines.piplines: + if not callable(pip): + continue + self._run_pip_async(pip, spider_ins, item, index, paralleled) + else: + if not paralleled and len(self.piplines.piplines) < index + 1: + return + pip = self.piplines.piplines[index][1] + if not callable(pip): + return + self._run_pip_async(pip, spider_ins, item, index, paralleled) + + async def handle_item(self): + while not self.stop: + if self.item_queue.qsize() <= 0: + await asyncio.sleep(0.5) + item = await self.item_queue.get() + self.item_queue.task_done() + # item = self.item_queue.get_nowait() + self._hand_piplines(self.spider, item, paralleled=self.pipline_is_paralleled) diff --git a/smart/downloader.py b/smart/downloader.py index bd415a2..8b42088 100644 --- a/smart/downloader.py +++ b/smart/downloader.py @@ -14,6 +14,8 @@ import aiohttp from concurrent.futures import TimeoutError +from aiohttp import TCPConnector + from smart.log import log from smart.middlewire import Middleware from smart.response import Response @@ -33,14 +35,12 @@ def fetch(self, request: Request) -> Response: class AioHttpDown(BaseDown): async def fetch(self, request: Request) -> Response: - print('run') - session = None - resp = None + session, resp = None, None try: - session = request.session or aiohttp.ClientSession() + session = request.session or aiohttp.ClientSession(connector=TCPConnector(limit=1)) resp = await session.request(request.method, request.url, - timeout=request.timeout or 10, + timeout=request.timeout, headers=request.header or {}, cookies=request.cookies or {}, data=request.data or {}, @@ -127,7 +127,9 @@ async def download(self, request: Request): .run_in_executor(None, fetch, request) except TimeoutError as e: # delay retry - self.scheduler.schedlue(request) + wait = self.scheduler.schedlue(request) + if inspect.isawaitable(wait): + await wait self.log.debug( f'req to fetch is timeout now so this req will dely to sechdule for retry {request.url}') return @@ -158,6 +160,7 @@ def get(self) -> Optional[Response]: response = self.response_queue.get_nowait() if response: self.reminder.go(Reminder.response_received, response) + self.response_queue.task_done() return response async def _before_fetch(self, request): diff --git a/smart/item.py b/smart/item.py index ddbb9e2..e4cdceb 100644 --- a/smart/item.py +++ b/smart/item.py @@ -36,61 +36,6 @@ def __new__(cls, name, bases, attrs): return new_class -# class Item(metaclass=ItemMeta): -# """ -# Item class for each item -# """ -# -# def __init__(self, source): -# self.__source = source -# results = self.__get_item() or {} -# self.__dict__.update(results) -# -# def to_dict(self): -# dict___items = {k: v for k, v in self.__dict__.items() if not k.startswith("_")} -# return dict___items -# -# def extract(self, key, other_source): -# if not key or not other_source: -# return None -# cls = self.__class__ -# fields = getattr(cls, "__fields") -# if key not in fields.keys(): -# return None -# for k, v in fields.items(): -# if isinstance(v, BaseField): -# value = v.extract(other_source) -# self.__dict__.update(key=value) -# return value -# -# def __get_item( -# self, -# ) -> Any: -# cls = self.__class__ -# fields = getattr(cls, "__fields") -# dict = {} -# for k, v in fields.items(): -# if isinstance(v, BaseField): -# value = v.extract(self.__source) -# else: -# value = v -# dict.setdefault(k, value) -# for k, v in cls.__dict__.items(): -# if k.startswith("_"): -# continue -# dict.setdefault(k, v) -# return dict -# -# def __getitem__(self, key): -# return self.__dict__[key] -# -# def __setitem__(self, key, value): -# if key in self.__dict__.keys(): -# self.__dict__[key] = value -# else: -# raise KeyError("%s does not support field: %s" % -# (self.__class__.__name__, key)) - class Item(metaclass=ItemMeta): """ diff --git a/smart/runer.py b/smart/runer.py index 4b81ce9..f117c38 100644 --- a/smart/runer.py +++ b/smart/runer.py @@ -16,7 +16,7 @@ from urllib.request import urlopen from smart.log import log -from smart.core2 import Engine +from smart.core4 import Engine from smart.middlewire import Middleware from smart.pipline import Piplines from smart.setting import gloable_setting_dict @@ -40,7 +40,7 @@ def __init__(self, loop=None): # avoid a certain extent: too many files error loop = loop or asyncio.ProactorEventLoop() else: - self.loop = loop or asyncio.new_event_loop() + loop = loop or asyncio.new_event_loop() thread_pool_max_size = gloable_setting_dict.get( "thread_pool_max_size", 30) loop.set_default_executor(ThreadPoolExecutor(thread_pool_max_size)) @@ -123,8 +123,9 @@ def _run(self): raise ValueError("can not finded spider tasks to start so ended...") self._print_info() try: - group_tasks = asyncio.gather(*tasks, loop=self.loop) + group_tasks = asyncio.gather(*tasks, loop=self.loop, return_exceptions=True) self.loop.run_until_complete(group_tasks) + self.loop.run_until_complete(self.loop.shutdown_asyncgens()) except CancelledError as e: self.log.debug(f" in loop, occured CancelledError e {e} ", exc_info=True) except KeyboardInterrupt as e2: diff --git a/smart/scheduler.py b/smart/scheduler.py index 7721a52..3a29542 100644 --- a/smart/scheduler.py +++ b/smart/scheduler.py @@ -5,6 +5,9 @@ # Date: 2020/12/21 # Desc: request scheduler, request filter # ------------------------------------------------------------------ +import asyncio +import inspect +import time from collections import deque from typing import Optional, Any @@ -13,6 +16,8 @@ from abc import ABC, abstractmethod +from smart.tool import mutations_bkdr_hash + class BaseSchedulerContainer(ABC): """ @@ -62,12 +67,12 @@ def __init__(self): def add(self, url): if url: - self.set_container.add(url) + self.set_container.add(mutations_bkdr_hash(url)) def contains(self, url): if not url: return False - if url in self.set_container: + if mutations_bkdr_hash(url) in self.set_container: return True return False @@ -95,7 +100,41 @@ def size(self) -> int: return len(self.url_queue) -class Scheduler: +class AsyncQequeSchedulerContainer(BaseSchedulerContainer): + """ + deque 保存request + """ + + def __init__(self): + self.url_queue = asyncio.Queue() + + async def push(self, request: Request): + await self.url_queue.put(request) + + async def pop(self) -> Optional[Request]: + res = await self.url_queue.get() + self.url_queue.task_done() + return res + + def size(self) -> int: + return self.url_queue.qsize() + + +class BaseScheduler(ABC): + """ + 请求调度器基类 + """ + + @abstractmethod + def schedlue(self, request: Request) -> bool: + pass + + @abstractmethod + def get(self) -> Optional[Request]: + pass + + +class Scheduler(BaseScheduler): """ 请求调度器 """ @@ -111,7 +150,7 @@ def __init__(self, duplicate_filter: BaseDuplicateFilter = None, self.scheduler_container = scheduler_container or DequeSchedulerContainer() self.log = log - def schedlue(self, request: Request): + def schedlue(self, request: Request) -> bool: """ 将请求放入 scheduler_container容器 :param request: 请求 @@ -124,9 +163,12 @@ def schedlue(self, request: Request): _url = request.url + ":" + str(request.retry) if self.duplicate_filter.contains(_url): self.log.debug(f"duplicate_filter filted ... url{_url} ") - return + return False self.duplicate_filter.add(_url) - self.scheduler_container.push(request) + push = self.scheduler_container.push(request) + if inspect.isawaitable(push): + asyncio.create_task(push) + return True def get(self) -> Optional[Request]: """ @@ -135,4 +177,62 @@ def get(self) -> Optional[Request]: :return: Optional[Request] """ self.log.debug(f"get a request to download task ") - return self.scheduler_container.pop() + pop = self.scheduler_container.pop() + if pop is None: + return None + + if inspect.isawaitable(pop): + # todo 同步代码里怎么等待 执行异步协程代码的结果? + task = asyncio.create_task(pop) + return task + else: + return pop + + +class AsyncScheduler(BaseScheduler): + """ + 请求调度器 + """ + + def __init__(self, duplicate_filter: BaseDuplicateFilter = None, + scheduler_container: BaseSchedulerContainer = None): + """ + 初始方法 + :param duplicate_filter: 去重器对象 + :param scheduler_container: 调度容器对象 + """ + self.duplicate_filter = duplicate_filter or SampleDuplicateFilter() + self.scheduler_container = scheduler_container or AsyncQequeSchedulerContainer() + self.log = log + + async def schedlue(self, request: Request) -> bool: + """ + 将请求放入 scheduler_container容器 + :param request: 请求 + :return: None + """ + self.log.debug(f"get a request {request} wating toschedlue ") + # dont_filter=true的请求不过滤 + if not request.dont_filter: + # retry 失败的 重试实现延迟调度 + _url = request.url + ":" + str(request.retry) + if self.duplicate_filter.contains(_url): + self.log.debug(f"duplicate_filter filted ... url{_url} ") + return False + self.duplicate_filter.add(_url) + push = self.scheduler_container.push(request) + if inspect.isawaitable(push): + await push + return True + + async def get(self) -> Optional[Request]: + """ + 从scheduler_container容器获取一个请求对象 + 可能为空 + :return: Optional[Request] + """ + self.log.debug(f"get a request to download task ") + req = self.scheduler_container.pop() + if inspect.isawaitable(req): + req = await req + return req diff --git a/smart/setting.py b/smart/setting.py index 3052f69..1713716 100644 --- a/smart/setting.py +++ b/smart/setting.py @@ -12,7 +12,7 @@ # request timeout 10 s "req_timeout": 10, # 每个爬虫的请求并发数 - "req_per_concurrent": 100, + "req_per_concurrent": 200, # 每个请求的最大重试次数 "req_max_retry": 3, # 默认请求头 @@ -29,22 +29,24 @@ # 请求url调度器容器 # 自己实现需要继承 BaseSchedulerContainer 实现相关抽象方法 系统默认DequeSchedulerContainer "scheduler_container_class": "smart.scheduler.DequeSchedulerContainer", + # 调度器 + "scheduler_class": "smart.scheduler.Scheduler", # 请求网络的方法 输入 request 输出 response # 自己实现需要继承 BaseDown 实现相关抽象方法 系统默认AioHttpDown "net_download_class": "smart.downloader.AioHttpDown", # 线程池数 当 middwire pipline 有不少耗时的同步方法时 适当调大 - "thread_pool_max_size": 50, + "thread_pool_max_size": 250, # 根据响应的状态码 忽略以下响应 "ignore_response_codes": [401, 403, 404, 405, 500, 502, 504], # 是否是分布式爬虫 "is_single": 1, # pipline之间 处理item 是否并行处理 默认 0 串行 1 并行 - "pipline_is_paralleled": 0, + "pipline_is_paralleled": 1, # 启动时网络是否畅通检查地址 "net_healthy_check_url": "https://www.baidu.com", # log level - "log_level": "info", + "log_level": "debug", "log_name": "smart-spider", "log_path": ".logs/smart.log", - "is_write_to_file": False, + "is_write_to_file": True, } diff --git a/smart/signal.py b/smart/signal.py index 80aa26f..170812a 100644 --- a/smart/signal.py +++ b/smart/signal.py @@ -5,7 +5,9 @@ # Date: 2021/1/15 # Desc: gloable sinal trigger # ------------------------------------------------------------------ -from blinker import Signal +from functools import partial + +from blinker import Signal, ANY class _Reminder: @@ -35,6 +37,8 @@ class _Reminder: def __init__(self, *args, **kwargs): pass + + def go(self, signal: Signal, *args, **kwargs): """ 在对应的时期点触发信号 diff --git a/smart/tool.py b/smart/tool.py index b673616..b2cae0e 100644 --- a/smart/tool.py +++ b/smart/tool.py @@ -11,7 +11,8 @@ import urllib # 验证Url 是否合法的正则 -RE_COMPILE = re.compile("(^https?:/{2}\w.+$)|(ftp://)") + +RE_COMPILE = re.compile("(^https?:/{2}\w.+$)|(ftp://)|(^ws?:/{2}\w.+$)") def is_valid_url(url): @@ -109,3 +110,5 @@ def mutations_bkdr_hash(value: str): for v in value: h = seed * h + ord(v) return h & 0x7FFFFFFF + + diff --git a/spiders/distributed/__init__.py b/spiders/distributed/__init__.py index 17b4cbf..0c1110a 100644 --- a/spiders/distributed/__init__.py +++ b/spiders/distributed/__init__.py @@ -13,49 +13,88 @@ import threading import time from collections import deque +from concurrent.futures.thread import ThreadPoolExecutor from typing import Optional from smart.request import Request from smart.scheduler import BaseDuplicateFilter, BaseSchedulerContainer import redis # 导入redis 模块 +from smart.signal import reminder +from test.redis_lock import acquire_lock, release_lock + class RedisSchuler(BaseSchedulerContainer): - pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True) + pool = redis.ConnectionPool(host='121.4.157.53', port=6399, password="Admin123@@@", decode_responses=True) + + # pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True) + tp = ThreadPoolExecutor(100) + + _stop = False def __init__(self): self.redis = redis.Redis(connection_pool=self.pool) self.task_queue_name = "smart_spider_redis_task_queue" # 需要保持session 的放在本地 或者序列化报错的request 的容器 self.faults = deque() + self.caches = deque() self.ecodeing = "latin1" + self.tp.submit(self.buffer) - def push(self, request: Request): + def _do_push(self, request: Request): try: req_byte = pickle.dumps(request) self.redis.rpush(self.task_queue_name, req_byte.decode(self.ecodeing)) except Exception: self.faults.append(request) - def pop(self) -> Optional[Request]: + def push(self, request: Request): + self.tp.submit(self._do_push, request) + + def buffer(self): + while not self._stop: + res = self._do_pop() + if res: + self.caches.append(res) + else: + time.sleep(0.5) + time.sleep(0.001) + + def _do_pop(self) -> Optional[Request]: if len(self.faults) > 0: req = self.faults.popleft() if req: return req else: - code = self.redis.lpop(self.task_queue_name) - if code: - req_byte = code.encode(self.ecodeing) - req = pickle.loads(req_byte) - return req + try: + # identifier = acquire_lock('resource') + code = self.redis.lpop(self.task_queue_name) + if code: + req_byte = code.encode(self.ecodeing) + req = pickle.loads(req_byte) + return req + except Exception as e: + print(e) + return None + + def pop(self) -> Optional[Request]: + if self.caches: + return self.caches.popleft() return None def size(self) -> int: return self.redis.llen(self.task_queue_name) + @staticmethod + @reminder.engin_close.connect + def engin_close(sender, **kwargs): + RedisSchuler._stop = True + RedisSchuler.tp.shutdown() + class RedisBaseDuplicateFilter(BaseDuplicateFilter): - pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True) + # pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True) + pool = redis.ConnectionPool(host='121.4.157.53', port=6399, password="Admin123@@@", decode_responses=True) def __init__(self): self.redis = redis.Redis(connection_pool=self.pool) diff --git a/spiders/ipspider2.py b/spiders/ipspider2.py index bb2706d..d895d24 100644 --- a/spiders/ipspider2.py +++ b/spiders/ipspider2.py @@ -25,17 +25,24 @@ class IpSpider(Spider): # "scheduler_container_class": "spiders.distributed.RedisSchuler", # "is_single": 0, # } + **{ + # "scheduler_container_class": "smart.scheduler.AsyncQequeSchedulerContainer", + # # # 调度器 + # "scheduler_class": "smart.scheduler.AsyncScheduler", + } } def start_requests(self): - for page in range(1000): + for page in range(100): url = f'http://exercise.kingname.info/exercise_middleware_ip/{page}' yield Request(url, callback=self.parse, dont_filter=False, timeout=9) def parse(self, response: Response): - print(response.status) + print(response.text) item = TestItem.get_item("") - yield item + # yield item + # yield Request(url=response.url + "2", callback=self.parse2, dont_filter=False, timeout=9) + # yield item # for i in range(1000): # yield Request(url="https://www.baidu.com?q=" + str(i), callback=self.parse2) @@ -53,6 +60,8 @@ def parse(self, response: Response): def parse2(self, response): print(response.status) print("parse2222") + item = TestItem.get_item("") + yield item def on_close(self): print('我被关闭了') diff --git a/test/__init__.py b/test/__init__.py index d86b0d6..52b5f4d 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -35,4 +35,4 @@ def print_on_response(spider_ins, request, response): global succedd succedd += 1 print(f"response0: {response.status}") - print(f"succedd: {succedd}") + print(f"succedd: spider_ins {id(spider_ins)} {succedd}")