Skip to content

Commit

Permalink
dev test
Browse files Browse the repository at this point in the history
  • Loading branch information
liangbaika committed Jan 25, 2021
1 parent f82591f commit 5810151
Show file tree
Hide file tree
Showing 15 changed files with 1,404 additions and 63 deletions.
22 changes: 17 additions & 5 deletions launcher_ds.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
import atexit
import multiprocessing
import threading
import time
from datetime import datetime
from multiprocessing.pool import Pool

from smart.log import log
Expand Down Expand Up @@ -130,18 +132,28 @@ def xxx(sender, **kwargs):
print("spider_start")


if __name__ == '__main__':
def main():
starter = CrawStater()
spider1 = GovsSpider()
spider2 = JsonSpider()
js_spider = JsSpider()
gloable_setting_dict.update(
duplicate_filter_class="spiders.distributed.RedisBaseDuplicateFilter",
scheduler_container_class="spiders.distributed.RedisSchuler",
pipline_is_paralleled=1,
duplicate_filter_class="spiders.distributed.AioRedisBaseDuplicateFilter",
scheduler_container_class="spiders.distributed.AioRedisSchuler",
is_single=0,
)

spider = IpSpider()
starter.run_many([spider], middlewire=middleware2, pipline=piplinestest)


if __name__ == '__main__':
start = time.time()
pool = multiprocessing.Pool(4)
for i in range(4):
pool.apply_async(main)
# main()
pool.close()
pool.join()
print(f'结束 花费{time.time() - start}s')

# starter.run_many([spider])
47 changes: 28 additions & 19 deletions smart/core4.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@ def __init__(self, spider, middlewire=None, pipline: Piplines = None):
"pipline_is_paralleled") if pipline_is_paralleled is None else pipline_is_paralleled
self.pipline_is_paralleled = pipline_is_paralleled

self.lock1 = asyncio.Lock()
self.lock2 = asyncio.Lock()

def _get_dynamic_class_setting(self, key):
class_str = self.spider.cutome_setting_dict.get(
key) or gloable_setting_dict.get(
key)
class_str = self.spider.cutome_setting_dict.get(key) or gloable_setting_dict.get(key)
_module = importlib.import_module(".".join(class_str.split(".")[:-1]))
_class = getattr(_module, class_str.split(".")[-1])
self.log.info("_class:" + _class.__name__)
self.log.info(f"dynamic loaded key【{key}】--> class【{class_str}】success")
return _class

def iter_request(self):
Expand Down Expand Up @@ -137,22 +138,21 @@ async def start(self):
if wait is not None and inspect.isawaitable(wait):
waited = True
await wait

# if not wait:
# waited = True
# await asyncio.sleep(0.1)

await asyncio.sleep(0.001)
if self._check_can_stop(None):
if not waited:
await asyncio.sleep(0.001)
can_stop = await self._check_can_stop(None)
if can_stop:
# there is no request and the task has been completed.so ended
self.log.debug(" here is no request and the task has been completed.so engine will stop ..")
self.stop = True
break
if self.spider.state != "runing":
self.spider.state = "runing"

if not waited:
await asyncio.sleep(0.02)
# if not waited:
# await asyncio.sleep(0.001)
# if user_func_res is None or not wait:
# await self.lock1.acquire()

self.spider.state = "closed"
self.reminder.go(Reminder.spider_close, self.spider)
Expand All @@ -174,6 +174,10 @@ async def work(self):
if isinstance(request, Request):
setattr(request, "__spider__", self.spider)
self.reminder.go(Reminder.request_scheduled, request)
# if waited:
# resp = await self._ensure_future_special(request)
# else:
# self._ensure_future(request)
if waited:
resp = await self._ensure_future_special(request)
else:
Expand All @@ -184,7 +188,7 @@ async def work(self):

if resp is None:
if not waited:
await asyncio.sleep(0.1)
await asyncio.sleep(0.004)
# let the_downloader can be scheduled, test 0.001-0.0006 is better
# uniform = random.uniform(0.0001, 0.006)
if not seed:
Expand Down Expand Up @@ -254,7 +258,7 @@ def _handle_exception(self, spider, e):
except BaseException:
pass

def _check_can_stop(self, request):
async def _check_can_stop(self, request):
if request:
return False
if len(self.task_dict) > 0:
Expand All @@ -267,15 +271,18 @@ def _check_can_stop(self, request):
return False
if self.downloader.response_queue.qsize() > 0:
return False
if len(self.request_generator_queue) > 0 and self.scheduler.scheduler_container.size() > 0:
size = self.scheduler.scheduler_container.size()
if inspect.isawaitable(size):
size = await size
if len(self.request_generator_queue) > 0 and size > 0:
return False
if self.scheduler.scheduler_container.size() > 0:
if size > 0:
return False
start = time.time()
self.reminder.go(Reminder.engin_idle, self)
while not self.is_single:
end = time.time()
period = 10
period = 5
if (end - start) > period:
self.log.info(f"empty loop {period}s second so stop")
break
Expand Down Expand Up @@ -319,8 +326,10 @@ def _hand_piplines(self, spider_ins, item, index=0, paralleled=False):
async def handle_item(self):
while not self.stop:
if self.item_queue.qsize() <= 0:
await asyncio.sleep(0.5)
await asyncio.sleep(0.2)

item = await self.item_queue.get()

self.item_queue.task_done()
# item = self.item_queue.get_nowait()
self._hand_piplines(self.spider, item, paralleled=self.pipline_is_paralleled)
Loading

0 comments on commit 5810151

Please sign in to comment.