diff --git a/launcher_ds.py b/launcher_ds.py index 6f92006..090ef7d 100644 --- a/launcher_ds.py +++ b/launcher_ds.py @@ -1,7 +1,9 @@ import asyncio import atexit +import multiprocessing import threading import time +from datetime import datetime from multiprocessing.pool import Pool from smart.log import log @@ -130,18 +132,28 @@ def xxx(sender, **kwargs): print("spider_start") -if __name__ == '__main__': +def main(): starter = CrawStater() spider1 = GovsSpider() spider2 = JsonSpider() js_spider = JsSpider() gloable_setting_dict.update( - duplicate_filter_class="spiders.distributed.RedisBaseDuplicateFilter", - scheduler_container_class="spiders.distributed.RedisSchuler", - pipline_is_paralleled=1, + duplicate_filter_class="spiders.distributed.AioRedisBaseDuplicateFilter", + scheduler_container_class="spiders.distributed.AioRedisSchuler", is_single=0, ) - spider = IpSpider() starter.run_many([spider], middlewire=middleware2, pipline=piplinestest) + + +if __name__ == '__main__': + start = time.time() + pool = multiprocessing.Pool(4) + for i in range(4): + pool.apply_async(main) + # main() + pool.close() + pool.join() + print(f'结束 花费{time.time() - start}s') + # starter.run_many([spider]) diff --git a/smart/core4.py b/smart/core4.py index 29c1f56..ecd6d7b 100644 --- a/smart/core4.py +++ b/smart/core4.py @@ -58,13 +58,14 @@ def __init__(self, spider, middlewire=None, pipline: Piplines = None): "pipline_is_paralleled") if pipline_is_paralleled is None else pipline_is_paralleled self.pipline_is_paralleled = pipline_is_paralleled + self.lock1 = asyncio.Lock() + self.lock2 = asyncio.Lock() + def _get_dynamic_class_setting(self, key): - class_str = self.spider.cutome_setting_dict.get( - key) or gloable_setting_dict.get( - key) + class_str = self.spider.cutome_setting_dict.get(key) or gloable_setting_dict.get(key) _module = importlib.import_module(".".join(class_str.split(".")[:-1])) _class = getattr(_module, class_str.split(".")[-1]) - self.log.info("_class:" + _class.__name__) + self.log.info(f"dynamic loaded key【{key}】--> class【{class_str}】success") return _class def iter_request(self): @@ -137,13 +138,10 @@ async def start(self): if wait is not None and inspect.isawaitable(wait): waited = True await wait - - # if not wait: - # waited = True - # await asyncio.sleep(0.1) - - await asyncio.sleep(0.001) - if self._check_can_stop(None): + if not waited: + await asyncio.sleep(0.001) + can_stop = await self._check_can_stop(None) + if can_stop: # there is no request and the task has been completed.so ended self.log.debug(" here is no request and the task has been completed.so engine will stop ..") self.stop = True @@ -151,8 +149,10 @@ async def start(self): if self.spider.state != "runing": self.spider.state = "runing" - if not waited: - await asyncio.sleep(0.02) + # if not waited: + # await asyncio.sleep(0.001) + # if user_func_res is None or not wait: + # await self.lock1.acquire() self.spider.state = "closed" self.reminder.go(Reminder.spider_close, self.spider) @@ -174,6 +174,10 @@ async def work(self): if isinstance(request, Request): setattr(request, "__spider__", self.spider) self.reminder.go(Reminder.request_scheduled, request) + # if waited: + # resp = await self._ensure_future_special(request) + # else: + # self._ensure_future(request) if waited: resp = await self._ensure_future_special(request) else: @@ -184,7 +188,7 @@ async def work(self): if resp is None: if not waited: - await asyncio.sleep(0.1) + await asyncio.sleep(0.004) # let the_downloader can be scheduled, test 0.001-0.0006 is better # uniform = random.uniform(0.0001, 0.006) if not seed: @@ -254,7 +258,7 @@ def _handle_exception(self, spider, e): except BaseException: pass - def _check_can_stop(self, request): + async def _check_can_stop(self, request): if request: return False if len(self.task_dict) > 0: @@ -267,15 +271,18 @@ def _check_can_stop(self, request): return False if self.downloader.response_queue.qsize() > 0: return False - if len(self.request_generator_queue) > 0 and self.scheduler.scheduler_container.size() > 0: + size = self.scheduler.scheduler_container.size() + if inspect.isawaitable(size): + size = await size + if len(self.request_generator_queue) > 0 and size > 0: return False - if self.scheduler.scheduler_container.size() > 0: + if size > 0: return False start = time.time() self.reminder.go(Reminder.engin_idle, self) while not self.is_single: end = time.time() - period = 10 + period = 5 if (end - start) > period: self.log.info(f"empty loop {period}s second so stop") break @@ -319,8 +326,10 @@ def _hand_piplines(self, spider_ins, item, index=0, paralleled=False): async def handle_item(self): while not self.stop: if self.item_queue.qsize() <= 0: - await asyncio.sleep(0.5) + await asyncio.sleep(0.2) + item = await self.item_queue.get() + self.item_queue.task_done() # item = self.item_queue.get_nowait() self._hand_piplines(self.spider, item, paralleled=self.pipline_is_paralleled) diff --git a/smart/core5.py b/smart/core5.py new file mode 100644 index 0000000..58fad0e --- /dev/null +++ b/smart/core5.py @@ -0,0 +1,347 @@ +# -*- coding utf-8 -*-# +# ------------------------------------------------------------------ +# Name: core +# Author: liangbaikai +# Date: 2020/12/22 +# Desc: there is a python file description +# ------------------------------------------------------------------ +import asyncio +import importlib +import inspect +import random +import time +import uuid +from asyncio import Lock, QueueEmpty +from collections import deque +from contextlib import suppress +from typing import Dict, Any + +from smart.log import log +from smart.downloader import Downloader +from smart.item import Item +from smart.pipline import Piplines +from smart.request import Request +from smart.response import Response +from smart.scheduler import Scheduler, DequeSchedulerContainer, AsyncScheduler +from smart.setting import gloable_setting_dict +from smart.signal import reminder, Reminder + + +class Engine: + def __init__(self, spider, middlewire=None, pipline: Piplines = None): + self.reminder = reminder + self.log = log + self.lock = None + self.task_dict: Dict[str, Any] = {} + self.pip_task_dict: Dict[str, asyncio.Task] = {} + self.spider = spider + self.middlewire = middlewire + self.piplines = pipline + duplicate_filter_class = self._get_dynamic_class_setting("duplicate_filter_class") + scheduler_container_class = self._get_dynamic_class_setting("scheduler_container_class") + net_download_class = self._get_dynamic_class_setting("net_download_class") + scheduler_class = self._get_dynamic_class_setting("scheduler_class") + self.scheduler = scheduler_class(duplicate_filter_class(), scheduler_container_class()) + req_per_concurrent = self.spider.cutome_setting_dict.get("req_per_concurrent") or gloable_setting_dict.get( + "req_per_concurrent") + single = self.spider.cutome_setting_dict.get("is_single") + self.is_single = gloable_setting_dict.get("is_single") if single is None else single + self.downloader = Downloader(self.scheduler, self.middlewire, reminder=self.reminder, + seq=req_per_concurrent, + downer=net_download_class()) + self.request_generator_queue = deque() + self.stop = False + self.item_queue = asyncio.Queue() + pipline_is_paralleled = self.spider.cutome_setting_dict.get("pipline_is_paralleled") + pipline_is_paralleled = gloable_setting_dict.get( + "pipline_is_paralleled") if pipline_is_paralleled is None else pipline_is_paralleled + self.pipline_is_paralleled = pipline_is_paralleled + + self.lock1 = asyncio.Lock() + self.lock2 = asyncio.Lock() + self.condition = asyncio.Condition(self.lock1) + self.cosomer = asyncio.Condition(self.lock1) + + def _get_dynamic_class_setting(self, key): + class_str = self.spider.cutome_setting_dict.get(key) or gloable_setting_dict.get(key) + _module = importlib.import_module(".".join(class_str.split(".")[:-1])) + _class = getattr(_module, class_str.split(".")[-1]) + self.log.info(f"dynamic loaded key【{key}】--> class【{class_str}】success") + return _class + + def iter_request(self): + while True: + if not self.request_generator_queue: + yield None + continue + request_generator = self.request_generator_queue[0] + spider, real_request_generator = request_generator[0], request_generator[1] + try: + request_or_item = next(real_request_generator) + except StopIteration: + self.request_generator_queue.popleft() + continue + except Exception as e: + # 可以处理异常 + self.request_generator_queue.popleft() + self._handle_exception(spider, e) + self.reminder.go(Reminder.spider_execption, spider, exception=e) + continue + yield request_or_item + + def _check_complete_pip(self, task): + if task.cancelled(): + self.log.debug(f" a task canceld ") + return + if task and task.done() and task._key: + if task.exception(): + self.log.error(f"a task occurer error in pipline {task.exception()} ") + else: + self.log.debug(f"a task done ") + result = task.result() + if result and isinstance(result, Item): + if hasattr(task, '_index'): + self._hand_piplines(task._spider, result, index=task._index + 1, paralleled=False) + self.pip_task_dict.pop(task._key) + + def _check_complete_callback(self, task): + if task.cancelled(): + self.log.debug(f" a task canceld ") + return + if task and task.done() and task._key: + self.log.debug(f"a task done ") + self.task_dict.pop(task._key) + + async def start(self): + self.spider.on_start() + self.reminder.go(Reminder.spider_start, self.spider) + self.reminder.go(Reminder.engin_start, self) + + self.request_generator_queue.append((self.spider, iter(self.spider))) + handle_items = [asyncio.ensure_future(self.handle_item()) for _ in range(11)] + works = [asyncio.ensure_future(self.work()) for _ in range(11)] + while not self.stop: + # 是否暂停了 + if self.lock and self.lock.locked(): + await asyncio.sleep(0.5) + continue + # 若是分布式爬虫 让内存里的任务不过堆积过多 尽量均分给其他机器 + if self.is_single: + if len(self.task_dict) > 2500: + await asyncio.sleep(0.3) + waited, wait = False, None + user_func_res = next(self.iter_request()) + if isinstance(user_func_res, Request): + wait = self.scheduler.schedlue(user_func_res) + + if isinstance(user_func_res, (dict, Item)): + wait = self.item_queue.put(user_func_res) + if wait is not None and inspect.isawaitable(wait): + waited = True + await wait + + # await asyncio.sleep(0.001) + if self._check_can_stop(None): + # there is no request and the task has been completed.so ended + self.log.debug(" here is no request and the task has been completed.so engine will stop ..") + self.stop = True + break + if self.spider.state != "runing": + self.spider.state = "runing" + + if user_func_res is None: + async with self.condition: + if not len(self.condition._waiters) > 0 and len(self.request_generator_queue) <= 0: + print('挂起') + await self.condition.wait() + print('醒了') + + # if not waited: + # await asyncio.sleep(0.001) + # if user_func_res is None or not wait: + # await self.lock1.acquire() + + self.spider.state = "closed" + self.reminder.go(Reminder.spider_close, self.spider) + self.spider.on_close() + for _t in works + handle_items: + _t.cancel() + self.reminder.go(Reminder.engin_close, self) + self.log.debug(f" engine stoped..") + + async def work(self): + seed = False + while not self.stop: + waited = False + print("work") + request = self.scheduler.get() + # if request is None or len(self.condition._waiters)>0: + + if inspect.isawaitable(request): + waited = True + request = await request + resp = None + + + if isinstance(request, Request): + setattr(request, "__spider__", self.spider) + self.reminder.go(Reminder.request_scheduled, request) + # if waited: + # resp = await self._ensure_future_special(request) + # else: + # self._ensure_future(request) + self._ensure_future(request) + _resp = self.downloader.get() + if not resp: + resp = _resp + + if resp is None: + if len(self.condition._waiters) > 0: + async with self.condition: + self.condition.notify() + # pass + if not waited: + await asyncio.sleep(0.1) + # let the_downloader can be scheduled, test 0.001-0.0006 is better + # uniform = random.uniform(0.0001, 0.006) + # if not seed: + # await asyncio.sleep(0.03) + seed = not seed + continue + custome_callback = resp.request.callback + if custome_callback: + request_generator = custome_callback(resp) + if request_generator: + self.request_generator_queue.append((self.spider, request_generator)) + + @staticmethod + async def cancel_all_tasks(): + tasks = [] + for task in asyncio.Task.all_tasks(): + if task is not asyncio.tasks.Task.current_task(): + tasks.append(task) + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + + def pause(self): + # self.log.info(f" out called pause.. so engine will pause.. ") + asyncio.create_task(self._lock()) + self.spider.state = "pause" + + def recover(self): + if self.lock and self.lock.locked(): + # self.log.info(f" out called recover.. so engine will recover.. ") + self.lock.release() + + def close(self): + # can make external active end engine + self.stop = True + tasks = asyncio.all_tasks() + for it in tasks: + it.cancel() + asyncio.gather(*tasks, return_exceptions=True) + self.log.debug(f" out called stop.. so engine close.. ") + + async def _lock(self): + if self.lock is None: + self.lock = Lock() + await self.lock.acquire() + + def _ensure_future(self, request: Request): + # compatible py_3.6 + task = asyncio.ensure_future(self.downloader.download(request)) + key = str(uuid.uuid4()) + task._key = key + self.task_dict[key] = task + task.add_done_callback(self._check_complete_callback) + + async def _ensure_future_special(self, request: Request): + # compatible py_3.6 + key = str(uuid.uuid4()) + self.task_dict[key] = request + resp = await self.downloader.download(request) + self.task_dict.pop(key) + return resp + + def _handle_exception(self, spider, e): + if spider: + try: + self.log.error(f" occured exceptyion e {e} ", exc_info=True) + spider.on_exception_occured(e) + except BaseException: + pass + + def _check_can_stop(self, request): + if request: + return False + if len(self.task_dict) > 0: + return False + if len(self.pip_task_dict) > 0: + return False + if len(self.request_generator_queue) > 0: + return False + if self.item_queue.qsize() > 0: + return False + if self.downloader.response_queue.qsize() > 0: + return False + if len(self.request_generator_queue) > 0 and self.scheduler.scheduler_container.size() > 0: + return False + if self.scheduler.scheduler_container.size() > 0: + return False + start = time.time() + self.reminder.go(Reminder.engin_idle, self) + while not self.is_single: + end = time.time() + period = 5 + if (end - start) > period: + self.log.info(f"empty loop {period}s second so stop") + break + if self.scheduler.scheduler_container.size() <= 0: + time.sleep(0.1) + else: + return False + self.log.info("craw end ,engin will stop") + return True + + def _run_pip_async(self, pip, spider_ins, item, index, paralleled=False): + if not inspect.iscoroutinefunction(pip): + task = asyncio.get_running_loop().run_in_executor(None, pip, spider_ins, item) + else: + task = asyncio.ensure_future(pip(spider_ins, item)) + key = str(uuid.uuid4()) + task._key = key + if not paralleled: + task._index = index + task._spider = spider_ins + self.pip_task_dict[key] = task + task.add_done_callback(self._check_complete_pip) + + def _hand_piplines(self, spider_ins, item, index=0, paralleled=False): + if self.piplines is None or len(self.piplines.piplines) <= 0: + self.log.debug("get a item but can not find a piplinse to handle it so ignore it ") + return + if paralleled: + for order, pip in self.piplines.piplines: + if not callable(pip): + continue + self._run_pip_async(pip, spider_ins, item, index, paralleled) + else: + if not paralleled and len(self.piplines.piplines) < index + 1: + return + pip = self.piplines.piplines[index][1] + if not callable(pip): + return + self._run_pip_async(pip, spider_ins, item, index, paralleled) + + async def handle_item(self): + while not self.stop: + print("RRRRRRRRRRR", len(self.condition._waiters)) + if self.item_queue.qsize() <= 0: + await asyncio.sleep(0.1) + + item = await self.item_queue.get() + self.item_queue.task_done() + # item = self.item_queue.get_nowait() + self._hand_piplines(self.spider, item, paralleled=self.pipline_is_paralleled) + async with self.condition: + self.condition.notify() diff --git a/smart/core6.py b/smart/core6.py new file mode 100644 index 0000000..eee7972 --- /dev/null +++ b/smart/core6.py @@ -0,0 +1,330 @@ +# -*- coding utf-8 -*-# +# ------------------------------------------------------------------ +# Name: core +# Author: liangbaikai +# Date: 2020/12/22 +# Desc: there is a python file description +# ------------------------------------------------------------------ +import asyncio +import importlib +import inspect +import random +import time +import uuid +from asyncio import Lock, QueueEmpty +from collections import deque +from contextlib import suppress +from typing import Dict, Any + +from smart.log import log +from smart.downloader import Downloader +from smart.item import Item +from smart.pipline import Piplines +from smart.request import Request +from smart.response import Response +from smart.scheduler import Scheduler, DequeSchedulerContainer, AsyncScheduler +from smart.setting import gloable_setting_dict +from smart.signal import reminder, Reminder + + +class Engine: + def __init__(self, spider, middlewire=None, pipline: Piplines = None): + self.reminder = reminder + self.log = log + self.lock = None + self.task_dict: Dict[str, Any] = {} + self.pip_task_dict: Dict[str, asyncio.Task] = {} + self.spider = spider + self.middlewire = middlewire + self.piplines = pipline + duplicate_filter_class = self._get_dynamic_class_setting("duplicate_filter_class") + scheduler_container_class = self._get_dynamic_class_setting("scheduler_container_class") + net_download_class = self._get_dynamic_class_setting("net_download_class") + scheduler_class = self._get_dynamic_class_setting("scheduler_class") + self.scheduler = scheduler_class(duplicate_filter_class(), scheduler_container_class()) + req_per_concurrent = self.spider.cutome_setting_dict.get("req_per_concurrent") or gloable_setting_dict.get( + "req_per_concurrent") + single = self.spider.cutome_setting_dict.get("is_single") + self.is_single = gloable_setting_dict.get("is_single") if single is None else single + self.downloader = Downloader(self.scheduler, self.middlewire, reminder=self.reminder, + seq=req_per_concurrent, + downer=net_download_class()) + self.request_generator_queue = deque() + self.stop = False + self.condition = asyncio.Condition() + self.item_queue = asyncio.Queue() + pipline_is_paralleled = self.spider.cutome_setting_dict.get("pipline_is_paralleled") + pipline_is_paralleled = gloable_setting_dict.get( + "pipline_is_paralleled") if pipline_is_paralleled is None else pipline_is_paralleled + self.pipline_is_paralleled = pipline_is_paralleled + + self.lock1 = asyncio.Lock() + self.lock2 = asyncio.Lock() + + def _get_dynamic_class_setting(self, key): + class_str = self.spider.cutome_setting_dict.get(key) or gloable_setting_dict.get(key) + _module = importlib.import_module(".".join(class_str.split(".")[:-1])) + _class = getattr(_module, class_str.split(".")[-1]) + self.log.info(f"dynamic loaded key【{key}】--> class【{class_str}】success") + return _class + + def iter_request(self): + while True: + if not self.request_generator_queue: + yield None + continue + request_generator = self.request_generator_queue[0] + spider, real_request_generator = request_generator[0], request_generator[1] + try: + request_or_item = next(real_request_generator) + except StopIteration: + self.request_generator_queue.popleft() + continue + except Exception as e: + # 可以处理异常 + self.request_generator_queue.popleft() + self._handle_exception(spider, e) + self.reminder.go(Reminder.spider_execption, spider, exception=e) + continue + yield request_or_item + + def _check_complete_pip(self, task): + if task.cancelled(): + self.log.debug(f" a task canceld ") + return + if task and task.done() and task._key: + if task.exception(): + self.log.error(f"a task occurer error in pipline {task.exception()} ") + else: + self.log.debug(f"a task done ") + result = task.result() + if result and isinstance(result, Item): + if hasattr(task, '_index'): + self._hand_piplines(task._spider, result, index=task._index + 1, paralleled=False) + self.pip_task_dict.pop(task._key) + + def _check_complete_callback(self, task): + if task.cancelled(): + self.log.debug(f" a task canceld ") + return + if task and task.done() and task._key: + self.log.debug(f"a task done ") + self.task_dict.pop(task._key) + + async def start(self): + self.spider.on_start() + self.reminder.go(Reminder.spider_start, self.spider) + self.reminder.go(Reminder.engin_start, self) + + self.request_generator_queue.append((self.spider, iter(self.spider))) + handle_items = [asyncio.ensure_future(self.handle_item()) for _ in range(50)] + works = [asyncio.ensure_future(self.work()) for _ in range(50)] + while not self.stop: + # 是否暂停了 + if self.lock and self.lock.locked(): + await asyncio.sleep(0.5) + continue + # 若是分布式爬虫 让内存里的任务不过堆积过多 尽量均分给其他机器 + if self.is_single: + if len(self.task_dict) > 1500: + await asyncio.sleep(0.3) + waited, wait = False, None + user_func_res = next(self.iter_request()) + if isinstance(user_func_res, Request): + wait = self.scheduler.schedlue(user_func_res) + + if isinstance(user_func_res, (dict, Item)): + wait = self.item_queue.put(user_func_res) + if wait is not None and inspect.isawaitable(wait): + waited = True + await wait + + await asyncio.sleep(0.001) + if self._check_can_stop(None): + # there is no request and the task has been completed.so ended + self.log.debug(" here is no request and the task has been completed.so engine will stop ..") + self.stop = True + break + if self.spider.state != "runing": + self.spider.state = "runing" + + # if not waited: + # await asyncio.sleep(0.001) + # if user_func_res is None or not wait: + # await self.lock1.acquire() + + self.spider.state = "closed" + self.reminder.go(Reminder.spider_close, self.spider) + self.spider.on_close() + for _t in works + handle_items: + _t.cancel() + self.reminder.go(Reminder.engin_close, self) + self.log.debug(f" engine stoped..") + + async def work(self): + seed = False + while not self.stop: + waited = False + request = self.scheduler.get() + if inspect.isawaitable(request): + waited = True + request = await request + resp = None + if isinstance(request, Request): + setattr(request, "__spider__", self.spider) + self.reminder.go(Reminder.request_scheduled, request) + if waited: + resp = await self._ensure_future_special(request) + else: + self._ensure_future(request) + _resp = self.downloader.get() + if not resp: + resp = _resp + + if resp is None: + if not waited: + await asyncio.sleep(0.004) + # let the_downloader can be scheduled, test 0.001-0.0006 is better + # uniform = random.uniform(0.0001, 0.006) + if not seed: + await asyncio.sleep(0.03) + seed = not seed + continue + custome_callback = resp.request.callback + if custome_callback: + request_generator = custome_callback(resp) + if request_generator: + self.request_generator_queue.append((custome_callback.__self__, request_generator)) + + + @staticmethod + async def cancel_all_tasks(): + tasks = [] + for task in asyncio.Task.all_tasks(): + if task is not asyncio.tasks.Task.current_task(): + tasks.append(task) + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + + def pause(self): + # self.log.info(f" out called pause.. so engine will pause.. ") + asyncio.create_task(self._lock()) + self.spider.state = "pause" + + def recover(self): + if self.lock and self.lock.locked(): + # self.log.info(f" out called recover.. so engine will recover.. ") + self.lock.release() + + def close(self): + # can make external active end engine + self.stop = True + tasks = asyncio.all_tasks() + for it in tasks: + it.cancel() + asyncio.gather(*tasks, return_exceptions=True) + self.log.debug(f" out called stop.. so engine close.. ") + + async def _lock(self): + if self.lock is None: + self.lock = Lock() + await self.lock.acquire() + + def _ensure_future(self, request: Request): + # compatible py_3.6 + task = asyncio.ensure_future(self.downloader.download(request)) + key = str(uuid.uuid4()) + task._key = key + self.task_dict[key] = task + task.add_done_callback(self._check_complete_callback) + + async def _ensure_future_special(self, request: Request): + # compatible py_3.6 + key = str(uuid.uuid4()) + self.task_dict[key] = request + resp = await self.downloader.download(request) + self.task_dict.pop(key) + return resp + + def _handle_exception(self, spider, e): + if spider: + try: + self.log.error(f" occured exceptyion e {e} ", exc_info=True) + spider.on_exception_occured(e) + except BaseException: + pass + + def _check_can_stop(self, request): + if request: + return False + if len(self.task_dict) > 0: + return False + if len(self.pip_task_dict) > 0: + return False + if len(self.request_generator_queue) > 0: + return False + if self.item_queue.qsize() > 0: + return False + if self.downloader.response_queue.qsize() > 0: + return False + if len(self.request_generator_queue) > 0 and self.scheduler.scheduler_container.size() > 0: + return False + return True + # if self.scheduler.scheduler_container.size() > 0: + # return False + start = time.time() + self.reminder.go(Reminder.engin_idle, self) + while not self.is_single: + end = time.time() + period = 5 + if (end - start) > period: + self.log.info(f"empty loop {period}s second so stop") + break + if self.scheduler.scheduler_container.size() <= 0: + time.sleep(0.1) + else: + return False + self.log.info("craw end ,engin will stop") + return True + + def _run_pip_async(self, pip, spider_ins, item, index, paralleled=False): + if not inspect.iscoroutinefunction(pip): + task = asyncio.get_running_loop().run_in_executor(None, pip, spider_ins, item) + else: + task = asyncio.ensure_future(pip(spider_ins, item)) + key = str(uuid.uuid4()) + task._key = key + if not paralleled: + task._index = index + task._spider = spider_ins + self.pip_task_dict[key] = task + task.add_done_callback(self._check_complete_pip) + + def _hand_piplines(self, spider_ins, item, index=0, paralleled=False): + if self.piplines is None or len(self.piplines.piplines) <= 0: + self.log.debug("get a item but can not find a piplinse to handle it so ignore it ") + return + if paralleled: + for order, pip in self.piplines.piplines: + if not callable(pip): + continue + self._run_pip_async(pip, spider_ins, item, index, paralleled) + else: + if not paralleled and len(self.piplines.piplines) < index + 1: + return + pip = self.piplines.piplines[index][1] + if not callable(pip): + return + self._run_pip_async(pip, spider_ins, item, index, paralleled) + + async def handle_item(self): + while not self.stop: + if self.item_queue.qsize() <= 0: + await asyncio.sleep(0.2) + + item = await self.item_queue.get() + + self.item_queue.task_done() + # item = self.item_queue.get_nowait() + self._hand_piplines(self.spider, item, paralleled=self.pipline_is_paralleled) + diff --git a/smart/core8.py b/smart/core8.py new file mode 100644 index 0000000..c1df1c4 --- /dev/null +++ b/smart/core8.py @@ -0,0 +1,257 @@ +# -*- coding utf-8 -*-# +# ------------------------------------------------------------------ +# Name: core +# Author: liangbaikai +# Date: 2020/12/22 +# Desc: there is a python file description +# ------------------------------------------------------------------ +import asyncio +import importlib +import inspect +import time +import uuid +from asyncio import Lock +from collections import deque +from typing import Dict + +from smart.log import log +from smart.downloader import Downloader +from smart.item import Item +from smart.pipline import Piplines +from smart.request import Request +from smart.scheduler import Scheduler +from smart.setting import gloable_setting_dict +from smart.signal import reminder + + +class Engine: + def __init__(self, spider, middlewire=None, pipline: Piplines = None): + self.lock = None + self.task_dict: Dict[str, asyncio.Task] = {} + self.pip_task_dict: Dict[str, asyncio.Task] = {} + self.spider = spider + self.middlewire = middlewire + self.piplines = pipline + self.reminder = reminder + duplicate_filter_class = self._get_dynamic_class_setting("duplicate_filter_class") + scheduler_container_class = self._get_dynamic_class_setting("scheduler_container_class") + net_download_class = self._get_dynamic_class_setting("net_download_class") + self.scheduler = Scheduler(duplicate_filter_class(), scheduler_container_class()) + req_per_concurrent = self.spider.cutome_setting_dict.get("req_per_concurrent") or gloable_setting_dict.get( + "req_per_concurrent") + self.downloader = Downloader(self.scheduler, self.middlewire, seq=req_per_concurrent, reminder=self.reminder, + downer=net_download_class()) + self.request_generator_queue = deque() + self.stop = False + self.log = log + + def _get_dynamic_class_setting(self, key): + class_str = self.spider.cutome_setting_dict.get( + key) or gloable_setting_dict.get( + key) + _module = importlib.import_module(".".join(class_str.split(".")[:-1])) + _class = getattr(_module, class_str.split(".")[-1]) + return _class + + def iter_request(self): + while True: + if not self.request_generator_queue: + yield None + continue + request_generator = self.request_generator_queue[0] + spider, real_request_generator = request_generator[0], request_generator[1] + try: + # execute and get a request from cutomer code + # request=real_request_generator.send(None) + request_or_item = next(real_request_generator) + if isinstance(request_or_item, Request): + request_or_item.__spider__ = spider + except StopIteration: + self.request_generator_queue.popleft() + continue + except Exception as e: + # 可以处理异常 + self.request_generator_queue.popleft() + self._handle_exception(spider, e) + continue + yield request_or_item + + def _check_complete_pip(self, task): + if task.cancelled(): + self.log.debug(f" a task canceld ") + return + if task and task.done() and task._key: + if task.exception(): + self.log.error(f"a task occurer error in pipline {task.exception()} ") + else: + self.log.debug(f"a task done ") + result = task.result() + if result and isinstance(result, Item): + if hasattr(task, '_index'): + self._hand_piplines(task._spider, result, task._index + 1) + self.pip_task_dict.pop(task._key) + + def _check_complete_callback(self, task): + if task.cancelled(): + self.log.debug(f" a task canceld ") + return + if task and task.done() and task._key: + self.log.debug(f"a task done ") + self.task_dict.pop(task._key) + + async def _start(self): + # core implenment + while not self.stop: + # paused + print('master') + if self.lock and self.lock.locked(): + await asyncio.sleep(1) + continue + if len(self.request_generator_queue) > 0: + print('request_generator_queue', len(self.request_generator_queue)) + for request_or_item in self.iter_request(): + if request_or_item is None: + break + print("request_or_item", request_or_item) + # request_or_item = next(self.iter_request()) + if isinstance(request_or_item, Request): + self.scheduler.schedlue(request_or_item) + + if isinstance(request_or_item, Item): + self._hand_piplines(self.spider, request_or_item) + + # self.request_generator_queue.append( request_generator) + else: + print('else') + request = self.scheduler.get() + + if isinstance(request, Request): + self._ensure_future(request) + + resp = self.downloader.get() + + if resp is None: + # let the_downloader can be scheduled, test 0.001-0.0006 is better + await asyncio.sleep(0.001) + continue + custome_callback = resp.request.callback + if custome_callback: + request_generator = custome_callback(resp) + if request_generator: + self.request_generator_queue.append((self.spider, request_generator)) + if self.spider.state != "runing": + self.spider.state = "runing" + + async def start(self): + self.spider.on_start() + self.request_generator_queue.append((self.spider, iter(self.spider))) + works = [asyncio.ensure_future(self._start()) for _ in range(2)] + self.spider.state = "closed" + self.spider.on_close() + while not self.stop: + can_stop = self._check_can_stop(None) + if can_stop: + # there is no request and the task has been completed.so ended + self.log.debug( + f" here is no request and the task has been completed.so engine will stop ..") + self.stop = True + break + await asyncio.sleep(0.3) + # wait some resource to freed + await asyncio.sleep(0.15) + self.log.debug(f" engine stoped..") + + def pause(self): + self.log.info(f" out called pause.. so engine will pause.. ") + asyncio.create_task(self._lock()) + self.spider.state = "pause" + + def recover(self): + if self.lock and self.lock.locked(): + self.log.info(f" out called recover.. so engine will recover.. ") + self.lock.release() + + def close(self): + # can make external active end engine + self.stop = True + tasks = asyncio.all_tasks() + for it in tasks: + it.cancel() + asyncio.gather(*tasks, return_exceptions=True) + self.log.debug(f" out called stop.. so engine close.. ") + + async def _lock(self): + if self.lock is None: + self.lock = Lock() + await self.lock.acquire() + + def _ensure_future(self, request: Request): + # compatible py_3.6 + task = asyncio.ensure_future(self.downloader.download(request)) + key = str(uuid.uuid4()) + task._key = key + self.task_dict[key] = task + task.add_done_callback(self._check_complete_callback) + + def _handle_exception(self, spider, e): + if spider: + try: + self.log.error(f" occured exceptyion e {e} ", exc_info=True) + spider.on_exception_occured(e) + except BaseException: + pass + + def _check_can_stop(self, request): + + if request: + return False + if len(self.task_dict) > 0: + return False + if len(self.pip_task_dict) > 0: + return False + if len(self.request_generator_queue) > 0: + return False + if len(self.request_generator_queue) > 0 and self.scheduler.scheduler_container.size() > 0: + return False + if self.downloader.response_queue.qsize() > 0: + return False + if self.scheduler.scheduler_container.size() > 0: + return False + start = time.time() + while 1: + end = time.time() + if (end - start) > 1.0: + print("空转 超过1s 停止") + break + if self.scheduler.scheduler_container.size() <= 0: + time.sleep(0.05) + else: + return False + + pass + + return True + + def _hand_piplines(self, spider_ins, item, index=0): + if self.piplines is None or len(self.piplines.piplines) <= 0: + self.log.info("get a item but can not find a piplinse to handle it so ignore it ") + return + + if len(self.piplines.piplines) < index + 1: + return + + pip = self.piplines.piplines[index][1] + + if not callable(pip): + return + + if not inspect.iscoroutinefunction(pip): + task = asyncio.get_running_loop().run_in_executor(None, pip, spider_ins, item) + else: + task = asyncio.ensure_future(pip(spider_ins, item)) + key = str(uuid.uuid4()) + task._key = key + task._index = index + task._spider = spider_ins + self.pip_task_dict[key] = task + task.add_done_callback(self._check_complete_pip) diff --git a/smart/core9.py b/smart/core9.py new file mode 100644 index 0000000..bba4be9 --- /dev/null +++ b/smart/core9.py @@ -0,0 +1,231 @@ +# -*- coding utf-8 -*-# +# ------------------------------------------------------------------ +# Name: core +# Author: liangbaikai +# Date: 2020/12/22 +# Desc: there is a python file description +# ------------------------------------------------------------------ +import asyncio +import importlib +import inspect +import random +import time +import uuid +from asyncio import Lock, QueueEmpty +from collections import deque +from contextlib import suppress +from types import AsyncGeneratorType, GeneratorType +from typing import Dict, Any + +import typing + +from smart.log import log +from smart.downloader import Downloader +from smart.item import Item +from smart.pipline import Piplines +from smart.request import Request +from smart.response import Response +from smart.scheduler import Scheduler, DequeSchedulerContainer, AsyncScheduler +from smart.setting import gloable_setting_dict +from smart.signal import reminder, Reminder + + +class Engine: + def __init__(self, spider, middlewire=None, pipline: Piplines = None): + self.reminder = reminder + self.log = log + self.task_dict: Dict[str, Any] = {} + self.pip_task_dict: Dict[str, asyncio.Task] = {} + self.spider = spider + self.middlewire = middlewire + self.piplines = pipline + duplicate_filter_class = self._get_dynamic_class_setting("duplicate_filter_class") + scheduler_container_class = self._get_dynamic_class_setting("scheduler_container_class") + net_download_class = self._get_dynamic_class_setting("net_download_class") + scheduler_class = self._get_dynamic_class_setting("scheduler_class") + self.scheduler = scheduler_class(duplicate_filter_class(), scheduler_container_class()) + req_per_concurrent = self.spider.cutome_setting_dict.get("req_per_concurrent") or gloable_setting_dict.get( + "req_per_concurrent") + single = self.spider.cutome_setting_dict.get("is_single") + self.is_single = gloable_setting_dict.get("is_single") if single is None else single + self.downloader = Downloader(self.scheduler, self.middlewire, reminder=self.reminder, + seq=req_per_concurrent, + downer=net_download_class()) + self.request_generator_queue = asyncio.Queue() + + self.stop = False + self.condition = asyncio.Condition() + self.item_queue = asyncio.Queue() + pipline_is_paralleled = self.spider.cutome_setting_dict.get("pipline_is_paralleled") + pipline_is_paralleled = gloable_setting_dict.get( + "pipline_is_paralleled") if pipline_is_paralleled is None else pipline_is_paralleled + self.pipline_is_paralleled = pipline_is_paralleled + + self.lock1 = asyncio.Lock() + self.lock2 = asyncio.Lock() + self.worker_tasks = [] + + def _get_dynamic_class_setting(self, key): + class_str = self.spider.cutome_setting_dict.get(key) or gloable_setting_dict.get(key) + _module = importlib.import_module(".".join(class_str.split(".")[:-1])) + _class = getattr(_module, class_str.split(".")[-1]) + self.log.info(f"dynamic loaded key【{key}】--> class【{class_str}】success") + return _class + + async def process_start_urls(self): + """ + Process the start URLs + :return: AN async iterator + """ + for req in self.spider: + yield req + + async def start(self): + self.spider.on_start() + self.reminder.go(Reminder.spider_start, self.spider) + self.reminder.go(Reminder.engin_start, self) + async for request_ins in self.process_start_urls(): + self.request_generator_queue.put_nowait(self.handle_request(request_ins)) + workers = [ + asyncio.ensure_future(self.start_worker()) + for _ in range(3) + ] + await self.request_generator_queue.join() + for t in workers: + await t + + self.spider.state = "closed" + self.reminder.go(Reminder.spider_close, self.spider) + self.spider.on_close() + # for _t in works + handle_items: + # _t.cancel() + self.reminder.go(Reminder.engin_close, self) + self.log.debug(f" engine stoped..") + + async def handle_request( + self, _request: Request + ): + """ + Wrap request with middleware. + :param request: + :return: + """ + # pass_through + request = await self._pass_through_schedule(_request) + callback_result, response = None, None + try: + setattr(request, "__spider__", self.spider) + response = await self.downloader.download(request) + if response is None: + return + if request.callback: + if inspect.iscoroutinefunction(request.callback): + callback_result = await request.callback(response) + else: + callback_result = request.callback(response) + except Exception as e: + self.log.error(f"0 and isinstance(complete[0],BaseException): + raise complete[0] self.loop.run_until_complete(self.loop.shutdown_asyncgens()) except CancelledError as e: self.log.debug(f" in loop, occured CancelledError e {e} ", exc_info=True) @@ -136,7 +138,7 @@ def _run(self): self.log.info(f'craw succeed {",".join(self.spider_names)} ended.. it cost {round(time.time() - start, 3)} s') - def _print_info(self): + def _print_logo_info(self): self.log.info("good luck!") self.log.info( """ @@ -158,8 +160,8 @@ def _print_info(self): " \r\n proverbs: whatever is worth doing is worth doing well." ) - @classmethod - def _check_internet_state(cls): + def _check_internet_state(self): + self.log.info("check internet health") error_msg = "internet may not be available please check net, run ended" net_healthy_check_url = gloable_setting_dict.get("net_healthy_check_url", None) if net_healthy_check_url is None: diff --git a/smart/scheduler.py b/smart/scheduler.py index 3a29542..25c3cb0 100644 --- a/smart/scheduler.py +++ b/smart/scheduler.py @@ -162,7 +162,7 @@ def schedlue(self, request: Request) -> bool: # retry 失败的 重试实现延迟调度 _url = request.url + ":" + str(request.retry) if self.duplicate_filter.contains(_url): - self.log.debug(f"duplicate_filter filted ... url{_url} ") + self.log.debug(f"duplicate_filter filted ... url {_url} ") return False self.duplicate_filter.add(_url) push = self.scheduler_container.push(request) @@ -182,7 +182,6 @@ def get(self) -> Optional[Request]: return None if inspect.isawaitable(pop): - # todo 同步代码里怎么等待 执行异步协程代码的结果? task = asyncio.create_task(pop) return task else: @@ -216,10 +215,16 @@ async def schedlue(self, request: Request) -> bool: if not request.dont_filter: # retry 失败的 重试实现延迟调度 _url = request.url + ":" + str(request.retry) - if self.duplicate_filter.contains(_url): + contains = self.duplicate_filter.contains(_url) + if inspect.isawaitable(contains): + contains = await contains + if contains: self.log.debug(f"duplicate_filter filted ... url{_url} ") return False - self.duplicate_filter.add(_url) + filter_add = self.duplicate_filter.add(_url) + if inspect.isawaitable(filter_add): + await filter_add + push = self.scheduler_container.push(request) if inspect.isawaitable(push): await push diff --git a/smart/setting.py b/smart/setting.py index 1713716..4685160 100644 --- a/smart/setting.py +++ b/smart/setting.py @@ -17,11 +17,9 @@ "req_max_retry": 3, # 默认请求头 "default_headers": { - "Accept": "*/*;", - "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", # 百度搜索引擎爬虫ua - "user-agent": "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)" + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36" }, # 请求url 去重处理器 # 自己实现需要继承 BaseDuplicateFilter 实现相关抽象方法 系统默认SampleDuplicateFilter @@ -45,8 +43,8 @@ # 启动时网络是否畅通检查地址 "net_healthy_check_url": "https://www.baidu.com", # log level - "log_level": "debug", + "log_level": "info", "log_name": "smart-spider", "log_path": ".logs/smart.log", - "is_write_to_file": True, + "is_write_to_file": False, } diff --git a/smart/tool.py b/smart/tool.py index b2cae0e..69ffe6f 100644 --- a/smart/tool.py +++ b/smart/tool.py @@ -102,13 +102,13 @@ def mutations_bkdr_hash(value: str): value = '' if not isinstance(value, str): value = str(value) - if len(value) >= 10000: - value = get_md5(value) - - seed = 131 - h = 0 - for v in value: - h = seed * h + ord(v) - return h & 0x7FFFFFFF + value = get_md5(value) + return value + + # seed = 131 + # h = 0 + # for v in value: + # h = seed * h + ord(v) + # return h & 0x7FFFFFFF diff --git a/spiders/distributed/__init__.py b/spiders/distributed/__init__.py index 0c1110a..93ad452 100644 --- a/spiders/distributed/__init__.py +++ b/spiders/distributed/__init__.py @@ -5,6 +5,7 @@ # Date: 2021/1/14 # Desc: there is a python file description # ------------------------------------------------------------------ +import asyncio import base64 import hashlib import json @@ -15,18 +16,23 @@ from collections import deque from concurrent.futures.thread import ThreadPoolExecutor from typing import Optional + +import aioredis + +from smart.log import log from smart.request import Request from smart.scheduler import BaseDuplicateFilter, BaseSchedulerContainer import redis # 导入redis 模块 from smart.signal import reminder +from smart.tool import mutations_bkdr_hash from test.redis_lock import acquire_lock, release_lock class RedisSchuler(BaseSchedulerContainer): - pool = redis.ConnectionPool(host='121.4.157.53', port=6399, password="Admin123@@@", decode_responses=True) + # pool = redis.ConnectionPool(host='121.4.157.53', port=6399, password="Admin123@@@", decode_responses=True) - # pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True) + pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True) tp = ThreadPoolExecutor(100) _stop = False @@ -51,12 +57,18 @@ def push(self, request: Request): self.tp.submit(self._do_push, request) def buffer(self): + cons = [] while not self._stop: + # if len(cons) > 0 and (max(cons) - min(cons) > 3 and len(cons) > 90): + # time.sleep(random.uniform(0.2, 2)) + # cons = [] + res = self._do_pop() if res: self.caches.append(res) + cons.append(time.time()) else: - time.sleep(0.5) + time.sleep(random.uniform(0.1, 0.5)) time.sleep(0.001) def _do_pop(self) -> Optional[Request]: @@ -92,9 +104,9 @@ def engin_close(sender, **kwargs): class RedisBaseDuplicateFilter(BaseDuplicateFilter): - # pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True) + pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True) - pool = redis.ConnectionPool(host='121.4.157.53', port=6399, password="Admin123@@@", decode_responses=True) + # pool = redis.ConnectionPool(host='121.4.157.53', port=6399, password="Admin123@@@", decode_responses=True) def __init__(self): self.redis = redis.Redis(connection_pool=self.pool) @@ -102,10 +114,75 @@ def __init__(self): def add(self, url): if url: - self.redis.sadd(self.filterset_name, url) + self.redis.sadd(self.filterset_name, mutations_bkdr_hash(url)) def contains(self, url): - return self.redis.sismember(self.filterset_name, url) + res = self.redis.sismember(self.filterset_name, mutations_bkdr_hash(url)) + return res def length(self): return self.redis.scard(self.filterset_name) + + +class AioRedisBaseDuplicateFilter(BaseDuplicateFilter): + # pool = redis.ConnectionPool(host='121.4.157.53', port=6399, password="Admin123@@@", decode_responses=True) + def __init__(self): + self.redis = None + self.lock = asyncio.Lock() + self.filterset_name = "aio_smart_spider_redis_repeat_set" + + async def add(self, url): + await self.creat_redi() + if url: + await self.redis.sadd(self.filterset_name, mutations_bkdr_hash(url)) + + async def contains(self, url): + await self.creat_redi() + res =await self.redis.sismember(self.filterset_name, mutations_bkdr_hash(url)) + return res + + async def length(self): + await self.creat_redi() + return await self.redis.scard(self.filterset_name) + + async def creat_redi(self): + if not self.redis: + async with self.lock: + if not self.redis: + self.redis = await aioredis.create_redis_pool(('127.0.0.1', 6379), db=0, encoding='utf-8') + print("#############") + +class AioRedisSchuler(BaseSchedulerContainer): + def __init__(self): + self.redis = None + self.task_queue_name = "aio_smart_spider_redis_task_queue" + self.ecodeing = "latin1" + self.lock = asyncio.Lock() + + async def push(self, request: Request): + await self.creat_redi() + req_byte = pickle.dumps(request) + await self.redis.rpush(self.task_queue_name, req_byte.decode(self.ecodeing)) + + async def creat_redi(self): + if not self.redis: + async with self.lock: + if not self.redis: + self.redis = await aioredis.create_redis_pool(('127.0.0.1', 6379), db=0, encoding='utf-8') + print("#############") + + async def pop(self) -> Optional[Request]: + try: + await self.creat_redi() + code = await self.redis.lpop(self.task_queue_name) + if code: + req_byte = code.encode(self.ecodeing) + req = pickle.loads(req_byte) + return req + except Exception as e: + print(e) + return None + + async def size(self) -> int: + await self.creat_redi() + return await self.redis.llen(self.task_queue_name) diff --git a/spiders/ipspider2.py b/spiders/ipspider2.py index d895d24..d66d05d 100644 --- a/spiders/ipspider2.py +++ b/spiders/ipspider2.py @@ -1,5 +1,6 @@ import asyncio import json +import random import threading from aiohttp import ClientSession @@ -27,7 +28,7 @@ class IpSpider(Spider): # } **{ # "scheduler_container_class": "smart.scheduler.AsyncQequeSchedulerContainer", - # # # 调度器 + # # # # 调度器 # "scheduler_class": "smart.scheduler.AsyncScheduler", } } @@ -35,12 +36,16 @@ class IpSpider(Spider): def start_requests(self): for page in range(100): url = f'http://exercise.kingname.info/exercise_middleware_ip/{page}' - yield Request(url, callback=self.parse, dont_filter=False, timeout=9) + yield Request(url, callback=self.parse, dont_filter=False, timeout=3) - def parse(self, response: Response): + async def parse(self, response: Response): print(response.text) + # seeds=[random.randint(1,1000000) for _ in range(10000)] + # for page in seeds: + # url = f'http://exercise.kingname.info/exercise_middleware_ip/{page}' + # yield Request(url, callback=self.parse2, dont_filter=True, timeout=3) item = TestItem.get_item("") - # yield item + yield item # yield Request(url=response.url + "2", callback=self.parse2, dont_filter=False, timeout=9) # yield item @@ -57,8 +62,9 @@ def parse(self, response: Response): # print(response.status) # yield Request(response.url, callback=self.parse2, dont_filter=True) - def parse2(self, response): - print(response.status) + async def parse2(self, response): + print(response.text) + print("parse2222") item = TestItem.get_item("") yield item diff --git a/test/aio.py b/test/aio.py index 8559466..0470010 100644 --- a/test/aio.py +++ b/test/aio.py @@ -24,7 +24,7 @@ async def fetch(url): async def do(): tasks = [] - for page in range(1000): + for page in range(100): url = f'http://exercise.kingname.info/exercise_middleware_ip/{page}' url = "https://www.baidu.com?q=" + str(page) task = asyncio.ensure_future(fetch(url)) @@ -35,10 +35,19 @@ async def do(): print(t.result()) +async def gennum(): + yield 1 + + +async def aaa(): + x = gennum() + print(x) + + if __name__ == '__main__': start = time.time() loop = asyncio.ProactorEventLoop() - loop.run_until_complete(do()) + loop.run_until_complete(aaa()) end = time.time() print('花费') print(end - start) diff --git a/test/aw.py b/test/aw.py new file mode 100644 index 0000000..b3c283c --- /dev/null +++ b/test/aw.py @@ -0,0 +1,59 @@ +# -*- coding utf-8 -*-# +# ------------------------------------------------------------------ +# Name: aw +# Author: liangbaikai +# Date: 2021/1/22 +# Desc: there is a python file description +# ------------------------------------------------------------------ +import asyncio + + +async def consumer(condition, n): + async with condition: + print('consumer {} is waiting'.format(n)) + await condition.wait() + print('consumer {} triggered'.format(n)) + print('ending consumer {}'.format(n)) + + +async def manipulate_condition(condition): + print('starting manipulate_condition') + + # pause to let consumers start + await asyncio.sleep(0.1) + + for i in range(1, 3): + async with condition: + print('notifying {} consumers'.format(i)) + condition.notify(n=i) + await asyncio.sleep(0.1) + + async with condition: + print('notifying remaining consumers') + condition.notify_all() + + print('ending manipulate_condition') + + +async def main(loop): + # Create a condition + condition = asyncio.Condition() + + # Set up tasks watching the condition + consumers = [ + consumer(condition, i) + for i in range(5) + ] + + # Schedule a task to manipulate the condition variable + loop.create_task(manipulate_condition(condition)) + + # Wait for the consumers to be done + await asyncio.wait(consumers) + + +event_loop = asyncio.get_event_loop() +try: + result = event_loop.run_until_complete(main(event_loop)) +finally: + event_loop.close()