Skip to content

Commit

Permalink
dev test
Browse files Browse the repository at this point in the history
  • Loading branch information
liangbaika committed Jan 20, 2021
1 parent f00f0de commit f82591f
Show file tree
Hide file tree
Showing 18 changed files with 940 additions and 111 deletions.
8 changes: 6 additions & 2 deletions launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import time
from multiprocessing.pool import Pool

from blinker import Signal

from smart.log import log
from smart.pipline import Piplines
from smart.runer import CrawStater
Expand All @@ -23,13 +25,15 @@

@piplinestest.pipline(1)
async def do_pip(spider_ins, item):
await asyncio.sleep(1)
print(f"我是item1111111 {item.results}")
return item


@piplinestest.pipline(2)
def pip2(spider_ins, item):
async def pip2(spider_ins, item):
print(f"我是item2222222 {item.results}")
await asyncio.sleep(0.5)
return item


Expand Down Expand Up @@ -136,6 +140,6 @@ def test11(sender, **kwargs):
spider2 = JsonSpider()
js_spider = JsSpider()
spider = IpSpider()
spider22 = IpSpider()
starter.run_many([spider], middlewire=middleware2, pipline=piplinestest)
# starter.run_many([spider])

6 changes: 3 additions & 3 deletions launcher_ds.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async def do_pip(spider_ins, item):


@piplinestest.pipline(2)
def pip2(spider_ins, item):
async def pip2(spider_ins, item):
print(f"我是item2222222 {item.results}")
return item

Expand Down Expand Up @@ -138,10 +138,10 @@ def xxx(sender, **kwargs):
gloable_setting_dict.update(
duplicate_filter_class="spiders.distributed.RedisBaseDuplicateFilter",
scheduler_container_class="spiders.distributed.RedisSchuler",
pipline_is_paralleled=1
pipline_is_paralleled=1,
is_single=0,
)

spider = IpSpider()
starter.run_many([spider], middlewire=middleware2, pipline=piplinestest)
# starter.run_many([spider])

8 changes: 8 additions & 0 deletions smart/buffer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# -*- coding utf-8 -*-#
# ------------------------------------------------------------------
# Name: __init__.py
# Author: liangbaikai
# Date: 2021/1/18
# Desc: there is a python file description
# ------------------------------------------------------------------

128 changes: 128 additions & 0 deletions smart/buffer/request_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# -*- coding utf-8 -*-#
# ------------------------------------------------------------------
# Name: request_buffer
# Author: liangbaikai
# Date: 2021/1/18
# Desc: there is a python file description
# ------------------------------------------------------------------
# -*- coding: utf-8 -*-
import collections
import threading
import time

MAX_URL_COUNT = 100 # 缓存中最大request数


class Singleton(object):
def __new__(cls, *args, **kwargs):
if not hasattr(cls, "_inst"):
cls._inst = super(Singleton, cls).__new__(cls)
return cls._inst


class RequestBuffer(threading.Thread, Singleton):
dedup = None

def __init__(self, table_folder):
if not hasattr(self, "_requests_deque"):
super(RequestBuffer, self).__init__()

self._thread_stop = False
self._is_adding_to_db = False

self._requests_deque = collections.deque()
self._del_requests_deque = collections.deque()
self._db = RedisDB()

def run(self):
while not self._thread_stop:
try:
self.__add_request_to_db()
except Exception as e:
print(e)
time.sleep(5)

def stop(self):
self._thread_stop = True

def put_request(self, request):
self._requests_deque.append(request)

if self.get_requests_count() > MAX_URL_COUNT: # 超过最大缓存,主动调用
self.flush()

def put_del_request(self, request):
self._del_requests_deque.append(request)

def flush(self):
try:
self.__add_request_to_db()
except Exception as e:
print(e)

def get_requests_count(self):
return len(self._requests_deque)

def is_adding_to_db(self):
return self._is_adding_to_db

def __add_request_to_db(self):
request_list = []
prioritys = []
callbacks = []

while self._requests_deque:
request = self._requests_deque.popleft()
self._is_adding_to_db = True

if callable(request):
# 函数
# 注意:应该考虑闭包情况。闭包情况可写成
# def test(xxx = xxx):
# # TODO 业务逻辑 使用 xxx
# 这么写不会导致xxx为循环结束后的最后一个值
callbacks.append(request)
continue

priority = request.priority

# 如果需要去重并且库中已重复 则continue
if (
request.filter_repeat
and setting.REQUEST_FILTER_ENABLE
and not self.__class__.dedup.add(request.fingerprint)
):
continue
else:
request_list.append(str(request.to_dict))
prioritys.append(priority)

if len(request_list) > MAX_URL_COUNT:
self._db.zadd(self._table_request, request_list, prioritys)
request_list = []
prioritys = []

# 入库
if request_list:
self._db.zadd(self._table_request, request_list, prioritys)

# 执行回调
for callback in callbacks:
try:
callback()
except Exception as e:
log.exception(e)

# 删除已做任务
if self._del_requests_deque:
request_done_list = []
while self._del_requests_deque:
request_done_list.append(self._del_requests_deque.popleft())

# 去掉request_list中的requests, 否则可能会将刚添加的request删除
request_done_list = list(set(request_done_list) - set(request_list))

if request_done_list:
self._db.zrem(self._table_request, request_done_list)

self._is_adding_to_db = False
4 changes: 3 additions & 1 deletion smart/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from smart.request import Request
from smart.scheduler import Scheduler
from smart.setting import gloable_setting_dict
from smart.signal import reminder


class Engine:
Expand All @@ -31,13 +32,14 @@ def __init__(self, spider, middlewire=None, pipline: Piplines = None):
self.spider = spider
self.middlewire = middlewire
self.piplines = pipline
self.reminder = reminder
duplicate_filter_class = self._get_dynamic_class_setting("duplicate_filter_class")
scheduler_container_class = self._get_dynamic_class_setting("scheduler_container_class")
net_download_class = self._get_dynamic_class_setting("net_download_class")
self.scheduler = Scheduler(duplicate_filter_class(), scheduler_container_class())
req_per_concurrent = self.spider.cutome_setting_dict.get("req_per_concurrent") or gloable_setting_dict.get(
"req_per_concurrent")
self.downloader = Downloader(self.scheduler, self.middlewire, seq=req_per_concurrent,
self.downloader = Downloader(self.scheduler, self.middlewire, seq=req_per_concurrent,reminder=self.reminder,
downer=net_download_class())
self.request_generator_queue = deque()
self.stop = False
Expand Down
27 changes: 13 additions & 14 deletions smart/core2.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from smart.item import Item
from smart.pipline import Piplines
from smart.request import Request
from smart.response import Response
from smart.scheduler import Scheduler, DequeSchedulerContainer
from smart.setting import gloable_setting_dict
from smart.signal import reminder, Reminder
Expand Down Expand Up @@ -62,7 +63,7 @@ def _get_dynamic_class_setting(self, key):
key)
_module = importlib.import_module(".".join(class_str.split(".")[:-1]))
_class = getattr(_module, class_str.split(".")[-1])
self.log.info("_class:"+_class.__name__)
self.log.info("_class:" + _class.__name__)
return _class

def iter_request(self):
Expand Down Expand Up @@ -125,6 +126,10 @@ async def start(self):
if self.lock and self.lock.locked():
await asyncio.sleep(0.5)
continue
if self.is_single:
if len(self.task_dict) > 1500:
await asyncio.sleep(0.3)

request_or_item = next(self.iter_request())

if isinstance(request_or_item, Request):
Expand All @@ -141,8 +146,8 @@ async def start(self):
f" here is no request and the task has been completed.so engine will stop ..")
self.stop = True
break
if request_or_item is None:
await asyncio.sleep(0.04)
# if request_or_item is None:
await asyncio.sleep(0.001)

if self.spider.state != "runing":
self.spider.state = "runing"
Expand All @@ -159,11 +164,9 @@ async def start(self):

async def work(self):
while not self.stop:
if not self.is_single:
if len(self.task_dict) > 2000:
await asyncio.sleep(0.03)
if not self.is_single and self.scheduler.scheduler_container.size() <= 10:
await asyncio.sleep(0.06)
# if not self.is_single:
# if len(self.task_dict) > 2000:
# await asyncio.sleep(0.03)
request = self.scheduler.get()
if isinstance(request, Request):
setattr(request, "__spider__", self.spider)
Expand All @@ -178,12 +181,7 @@ async def work(self):
# let the_downloader can be scheduled, test 0.001-0.0006 is better
# uniform = random.uniform(0.0001, 0.006)
# await asyncio.sleep(0.0005)
if not self.is_single:
# 分布式爬虫 此处可以把更多的任务放在共享队列里
await asyncio.sleep(0.017)
else:
# 单机的话 拼命调度
await asyncio.sleep(0.001)
await asyncio.sleep(0.001)
continue
custome_callback = resp.request.callback
if custome_callback:
Expand Down Expand Up @@ -232,6 +230,7 @@ def _ensure_future(self, request: Request):
self.task_dict[key] = task
task.add_done_callback(self._check_complete_callback)


def _handle_exception(self, spider, e):
if spider:
try:
Expand Down
Loading

0 comments on commit f82591f

Please sign in to comment.