Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix of requests propagation to Scheduler from middlewares and engine #276

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/topics/frontera-settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ OVERUSED_SLOT_FACTOR
Default: ``5.0``

(in progress + queued requests in that slot) / max allowed concurrent downloads per slot before slot is considered
overused. This affects only Scrapy scheduler."
overused. This affects only custom Frontera Scrapy scheduler.

.. setting:: REQUEST_MODEL

Expand Down
13 changes: 9 additions & 4 deletions docs/source/topics/scrapy-integration.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
==============================
Using the Frontier with Scrapy
==============================
==========================
Using Frontera with Scrapy
==========================

Using Frontera is quite easy, it includes a set of `Scrapy middlewares`_ and Scrapy scheduler that encapsulates
Frontera usage and can be easily configured using `Scrapy settings`_.
Expand All @@ -9,7 +9,7 @@ Frontera usage and can be easily configured using `Scrapy settings`_.
Activating the frontier
=======================

The Frontera uses 2 different middlewares: ``SchedulerSpiderMiddleware`` and ``SchedulerDownloaderMiddleware``, and it's
The Frontera uses two different middlewares: ``SchedulerSpiderMiddleware``, ``SchedulerDownloaderMiddleware``, and it's
own scheduler ``FronteraScheduler``.

To activate the Frontera in your Scrapy project, just add them to the `SPIDER_MIDDLEWARES`_,
Expand All @@ -31,6 +31,8 @@ Create a Frontera ``settings.py`` file and add it to your Scrapy settings::

Another option is to put these settings right into Scrapy settings module.

Frontera custom scheduler filters all requests generated by middlewares and Scrapy engine except redirected ones and
explicitly marked as seeds (see `seed_loaders`_).


Organizing files
Expand Down Expand Up @@ -161,6 +163,7 @@ Check also `Scrapy broad crawling`_ recommendations.
.. _`Quick start single process`: http://frontera.readthedocs.org/en/latest/topics/quick-start-single.html
.. _`Scrapy broad crawling`: http://doc.scrapy.org/en/master/topics/broad-crawls.html

.. _seed_loaders:

Scrapy Seed Loaders
===================
Expand All @@ -170,6 +173,8 @@ Frontera has some built-in Scrapy middlewares for seed loading.
Seed loaders use the ``process_start_requests`` method to generate requests from a source that are added later to the
:class:`FrontierManager <frontera.core.manager.FrontierManager>`.

It's possible to inject seeds from any other spider middleware by setting the ``seed`` key in requests ``meta``
dictionary to ``True``.

Activating a Seed loader
------------------------
Expand Down
7 changes: 6 additions & 1 deletion frontera/contrib/scrapy/middlewares/seeds/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ def from_crawler(cls, crawler):

def process_start_requests(self, start_requests, spider):
urls = [url for url in self.load_seeds() if not url.startswith('#')]
return [spider.make_requests_from_url(url) for url in urls]
for url in urls:
r = spider.make_requests_from_url(url)
r.meta['seed'] = True
yield r
for r in start_requests:
yield r

def load_seeds(self):
raise NotImplementedError
Expand Down
31 changes: 23 additions & 8 deletions frontera/contrib/scrapy/schedulers/frontier.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,31 +72,43 @@ def _set_value(self, variable, value):


class FronteraScheduler(Scheduler):
"""
Custom Scheduler for Scrapy.
Adapts Frontera manager interface to Scrapy.

Important remarks:
- it doesn't enqueue majority of requests produced by middlewares or direct calls to engine, see self.enqueue_request
method, and override if needed,
- requires SchedulerSpiderMiddleware and SchedulerDownloaderMiddleware.
"""

def __init__(self, crawler, manager=None):
self.crawler = crawler
self.stats_manager = StatsManager(crawler.stats)
self._pending_requests = deque()
self.redirect_enabled = crawler.settings.get('REDIRECT_ENABLED')
settings = ScrapySettingsAdapter(crawler.settings)
self.frontier = ScrapyFrontierManager(settings, manager)
self._delay_on_empty = self.frontier.manager.settings.get('DELAY_ON_EMPTY')
self._redirect_enabled = crawler.settings.get('REDIRECT_ENABLED')
self._delay_next_call = 0.0
self.logger = getLogger('frontera.contrib.scrapy.schedulers.FronteraScheduler')
self.logger = getLogger('frontera.contrib.scrapy.schedulers.frontier.FronteraScheduler')

@classmethod
def from_crawler(cls, crawler):
return cls(crawler)

def enqueue_request(self, request):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be even clearer to tag request that are enqueued in the Frontier through process_spider_output -> links_extracted (storing the tag in meta like you did for seed) and add any other non-tagged request to the local queue? This way you wouldn't need to check for redirect and other middleware could themselves chose between local scheduling or remote scheduling (Such as a middleware that logs in and try the request again without going through the frontier queue again).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe adding middleware which is bypassing the frontier and want's to get it's requests in the local queue requires planning from the user operating the crawler. He would need to understand all the consequences of this. If we take your login example, the user would need to deal with response that comes after previously unseen login request. Frontera custom scheduler will crash on this, because it lacks of frontier_request in meta. Therefore the middleware which logs in would need to intercept this response or figure out something else.

Frontera already has some ancient mechanism of tagging the requests
https://github.com/scrapinghub/frontera/blob/master/frontera/contrib/scrapy/converters.py#L41
at the moment it's not used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see also other cases: redirects or website requires obtaining session. In any of these cases I expect user to look into Frontera custom scheduler code and make appropriate changes. I see such customization as advanced topic, requiring some expert knowledge.

if not self._request_is_redirected(request):
self.frontier.add_seeds([request])
self.stats_manager.add_seeds()
return True
elif self.redirect_enabled:
# add directly to in-memory queue if request comes as part of redirect chain from RedirectMiddleware
if self._redirect_enabled and self._request_is_redirected(request):
self._add_pending_request(request)
self.stats_manager.add_redirected_requests()
return True
# add as seed if request is explicitly marked as seed
if self._request_is_seed(request):
self.frontier.add_seeds([request])
self.stats_manager.add_seeds()
return True
self.logger.warning("Request to URL %s is skipped.", request.url)
return False

def next_request(self):
Expand Down Expand Up @@ -166,7 +178,10 @@ def _get_exception_code(self, exception):
return '?'

def _request_is_redirected(self, request):
return request.meta.get(b'redirect_times', 0) > 0
return request.meta.get('redirect_times', 0) > 0

def _request_is_seed(self, request):
return bool(request.meta.get('seed', False))

def _get_downloader_info(self):
downloader = self.crawler.engine.downloader
Expand Down
5 changes: 4 additions & 1 deletion tests/scrapy_spider/spiders/example.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import absolute_import
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule

from scrapy.http import Request

class MySpider(CrawlSpider):
name = 'example'
Expand All @@ -18,4 +18,7 @@ def parse_page(self, response):
def parse_nothing(self, response):
pass

def make_requests_from_url(self, url):
return Request(url, dont_filter=True, meta={'seed':True})

parse_start_url = parse_nothing
28 changes: 12 additions & 16 deletions tests/test_frontera_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@


# test requests
r1 = Request('http://www.example.com')
r2 = Request('https://www.example.com/some/page')
r3 = Request('http://example1.com')
r1 = Request('http://www.example.com', meta={'seed': True})
r2 = Request('https://www.example.com/some/page', meta={'seed': True})
r3 = Request('http://example1.com', meta={'seed': True})


# test requests with redirects
rr1 = Request('http://www.example.com', meta={b'redirect_times': 1})
rr2 = Request('https://www.example.com/some/page', meta={b'redirect_times': 4})
rr3 = Request('http://example1.com', meta={b'redirect_times': 0})
rr1 = Request('http://www.example.com', meta={'redirect_times': 1})
rr2 = Request('https://www.example.com/some/page', meta={'redirect_times': 4})
rr3 = Request('http://example1.com', meta={'redirect_times': 0})


# test frontier requests
Expand Down Expand Up @@ -49,11 +49,10 @@ def test_redirect_disabled_enqueue_requests(self):
fs.open(Spider)
assert fs.enqueue_request(rr1) is False
assert fs.enqueue_request(rr2) is False
assert fs.enqueue_request(rr3) is True
assert isinstance(fs.frontier.manager.seeds[0], FRequest)
assert len(fs.frontier.manager.seeds) == 1
assert fs.frontier.manager.seeds[0].url == rr3.url
assert fs.stats_manager.stats.get_value('frontera/seeds_count') == 1
assert fs.enqueue_request(rr3) is False
assert len(fs.frontier.manager.seeds) == 0
assert fs.stats_manager.stats.get_value('frontera/seeds_count') == None


def test_redirect_enabled_enqueue_requests(self):
settings = Settings()
Expand All @@ -63,13 +62,10 @@ def test_redirect_enabled_enqueue_requests(self):
fs.open(Spider)
assert fs.enqueue_request(rr1) is True
assert fs.enqueue_request(rr2) is True
assert fs.enqueue_request(rr3) is True
assert len(fs.frontier.manager.seeds) == 1
assert isinstance(fs.frontier.manager.seeds[0], FRequest)
assert fs.frontier.manager.seeds[0].url == rr3.url
assert fs.enqueue_request(rr3) is False
assert set([request.url for request in fs._pending_requests]) == set([rr1.url, rr2.url])
assert all([isinstance(request, Request) for request in fs._pending_requests])
assert fs.stats_manager.stats.get_value('frontera/seeds_count') == 1
assert fs.stats_manager.stats.get_value('frontera/seeds_count') == None
assert fs.stats_manager.stats.get_value('frontera/redirected_requests_count') == 2

def test_next_request(self):
Expand Down
6 changes: 3 additions & 3 deletions tests/test_seed_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_load_seeds(self):

def test_process_start_requests(self):
seed_loader = self.seed_loader_setup()
requests = seed_loader.process_start_requests(None, Spider(name='spider'))
requests = seed_loader.process_start_requests((), Spider(name='spider'))
self.assertEqual([r.url for r in requests], ['https://www.example.com', 'https://www.scrapy.org'])

def test_process_start_requests_ignore_comments(self):
Expand All @@ -60,7 +60,7 @@ def test_process_start_requests_ignore_comments(self):
# https://www.test.com
"""
seed_loader = self.seed_loader_setup(seeds_content)
requests = seed_loader.process_start_requests(None, Spider(name='spider'))
requests = seed_loader.process_start_requests((), Spider(name='spider'))
self.assertEqual([r.url for r in requests], ['https://www.example.com', 'https://www.scrapy.org'])


Expand Down Expand Up @@ -123,5 +123,5 @@ def mocked_connect_s3(*args, **kwargs):

with mock.patch('frontera.contrib.scrapy.middlewares.seeds.s3.connect_s3',
side_effect=mocked_connect_s3):
requests = self.seed_loader.process_start_requests(None, Spider(name='spider'))
requests = self.seed_loader.process_start_requests((), Spider(name='spider'))
self.assertEqual(set([r.url for r in requests]), set(urls))