Skip to content

Commit 2d55a38

Browse files
authored
Merge pull request scrapy#176 from scrapinghub/kafka-codec-fix
fixing value check
2 parents 7dd6165 + 7dfa4a4 commit 2d55a38

File tree

8 files changed

+66
-17
lines changed

8 files changed

+66
-17
lines changed

docs/source/topics/frontera-settings.rst

+31
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,37 @@ Default: ``'frontera.contrib.backends.memory.FIFO'``
8787
The :class:`Backend <frontera.core.components.Backend>` to be used by the frontier. For more info see
8888
:ref:`Activating a backend <frontier-activating-backend>`.
8989

90+
91+
.. setting:: BC_MIN_REQUESTS
92+
93+
BC_MIN_REQUESTS
94+
---------------
95+
96+
Default: ``64``
97+
98+
Broad crawling queue get operation will keep retrying until specified number of requests is collected. Maximum number
99+
of retries is hard-coded to 3.
100+
101+
.. setting:: BC_MIN_HOSTS
102+
103+
BC_MIN_HOSTS
104+
------------
105+
106+
Default: ``24``
107+
108+
Keep retyring when getting requests from queue, until there are requests for specified minimum number of hosts
109+
collected. Maximum number of retries is hard-coded and equals 3.
110+
111+
.. setting:: BC_MAX_REQUESTS_PER_HOST
112+
113+
BC_MAX_REQUESTS_PER_HOST
114+
------------------------
115+
116+
Default:: ``128``
117+
118+
Don't include (if possible) batches of requests containing requests for specific host if there are already more then
119+
specified count of maximum requests per host. This is a suggestion for broad crawling queue get algorithm.
120+
90121
.. setting:: CANONICAL_SOLVER
91122

92123
CANONICAL_SOLVER

docs/source/topics/frontier-backends.rst

+6
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,12 @@ tunning a block cache to fit states within one block for average size website. T
283283
to achieve documents closeness within the same host. This function can be selected with :setting:`URL_FINGERPRINT_FUNCTION`
284284
setting.
285285

286+
.. TODO: document details of block cache tuning,
287+
BC* settings and queue get operation concept,
288+
hbase tables schema and data flow
289+
Queue exploration
290+
shuffling with MR jobs
291+
286292
.. _FIFO: http://en.wikipedia.org/wiki/FIFO
287293
.. _LIFO: http://en.wikipedia.org/wiki/LIFO_(computing)
288294
.. _DFS: http://en.wikipedia.org/wiki/Depth-first_search

frontera/contrib/backends/hbase.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,10 @@ def __init__(self, manager):
376376
port = settings.get('HBASE_THRIFT_PORT')
377377
hosts = settings.get('HBASE_THRIFT_HOST')
378378
namespace = settings.get('HBASE_NAMESPACE')
379+
self._min_requests = settings.get('BC_MIN_REQUESTS')
380+
self._min_hosts = settings.get('BC_MIN_HOSTS')
381+
self._max_requests_per_host = settings.get('BC_MAX_REQUESTS_PER_HOST')
382+
379383
self.queue_partitions = settings.get('SPIDER_FEED_PARTITIONS')
380384
host = choice(hosts) if type(hosts) in [list, tuple] else hosts
381385
kwargs = {
@@ -456,8 +460,10 @@ def get_next_requests(self, max_next_requests, **kwargs):
456460
for partition_id in range(0, self.queue_partitions):
457461
if partition_id not in partitions:
458462
continue
459-
results = self.queue.get_next_requests(max_next_requests, partition_id, min_requests=64,
460-
min_hosts=24, max_requests_per_host=128)
463+
results = self.queue.get_next_requests(max_next_requests, partition_id,
464+
min_requests=self._min_requests,
465+
min_hosts=self._min_hosts,
466+
max_requests_per_host=self._max_requests_per_host)
461467
next_pages.extend(results)
462468
self.logger.debug("Got %d requests for partition id %d", len(results), partition_id)
463469
return next_pages

frontera/contrib/messagebus/kafkabus.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ def __init__(self, settings):
223223
if codec == 'gzip':
224224
from kafka.protocol import CODEC_GZIP
225225
self.codec = CODEC_GZIP
226-
if not self.codec:
226+
if self.codec is None:
227227
raise NameError("Non-existent Kafka compression codec.")
228228

229229
self.conn = KafkaClient(server)

frontera/core/manager.py

+10-9
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,19 @@
1010

1111
class ComponentsPipelineMixin(object):
1212
def __init__(self, backend, middlewares=None, canonicalsolver=None, db_worker=False, strategy_worker=False):
13+
self._logger_components = logging.getLogger("manager.components")
14+
1315
# Load middlewares
1416
self._middlewares = self._load_middlewares(middlewares)
1517

1618
# Load canonical solver
17-
self._logger.debug("Loading canonical url solver '%s'", canonicalsolver)
19+
self._logger_components.debug("Loading canonical url solver '%s'", canonicalsolver)
1820
self._canonicalsolver = self._load_object(canonicalsolver)
1921
assert isinstance(self.canonicalsolver, CanonicalSolver), \
2022
"canonical solver '%s' must subclass CanonicalSolver" % self.canonicalsolver.__class__.__name__
2123

2224
# Load backend
23-
self._logger.debug("Loading backend '%s'", backend)
25+
self._logger_components.debug("Loading backend '%s'", backend)
2426
self._backend = self._load_backend(backend, db_worker, strategy_worker)
2527

2628
@property
@@ -67,14 +69,14 @@ def _load_middlewares(self, middleware_names):
6769
# TO-DO: Use dict for middleware ordering
6870
mws = []
6971
for mw_name in middleware_names or []:
70-
self._logger.debug("Loading middleware '%s'", mw_name)
72+
self._logger_components.debug("Loading middleware '%s'", mw_name)
7173
try:
7274
mw = self._load_object(mw_name, silent=False)
7375
assert isinstance(mw, Middleware), "middleware '%s' must subclass Middleware" % mw.__class__.__name__
7476
if mw:
7577
mws.append(mw)
7678
except NotConfigured:
77-
self._logger.warning("middleware '%s' disabled!", mw_name)
79+
self._logger_components.warning("middleware '%s' disabled!", mw_name)
7880

7981
return mws
8082

@@ -89,15 +91,14 @@ def _process_components(self, method_name, obj=None, return_classes=None, **kwar
8991
if check_response:
9092
return_obj = result
9193
if check_response and obj and not return_obj:
92-
self._logger.warning("Object '%s' filtered in '%s' by '%s'",
93-
obj.__class__.__name__, method_name, component.__class__.__name__
94-
)
94+
self._logger_components.warning("Object '%s' filtered in '%s' by '%s'",
95+
obj.__class__.__name__, method_name, component.__class__.__name__)
9596
return
9697
return return_obj
9798

9899
def _process_component(self, component, method_name, component_category, obj, return_classes, **kwargs):
99-
self._logger.debug("processing '%s' '%s.%s' %s",
100-
method_name, component_category, component.__class__.__name__, obj)
100+
self._logger_components.debug("processing '%s' '%s.%s' %s",
101+
method_name, component_category, component.__class__.__name__, obj)
101102
return_obj = getattr(component, method_name)(*([obj] if obj else []), **kwargs)
102103
assert return_obj is None or isinstance(return_obj, return_classes), \
103104
"%s '%s.%s' must return None or %s, Got '%s'" % \

frontera/settings/default_settings.py

+3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33

44
AUTO_START = True
55
BACKEND = 'frontera.contrib.backends.memory.FIFO'
6+
BC_MIN_REQUESTS = 64
7+
BC_MIN_HOSTS = 24
8+
BC_MAX_REQUESTS_PER_HOST = 128
69
CANONICAL_SOLVER = 'frontera.contrib.canonicalsolvers.Basic'
710
DELAY_ON_EMPTY = 5.0
811
DOMAIN_FINGERPRINT_FUNCTION = 'frontera.utils.fingerprint.sha1'

frontera/worker/db.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,12 @@ def consume_incoming(self, *args, **kwargs):
144144
if type == 'page_crawled':
145145
_, response, links = msg
146146
logger.debug("Page crawled %s", response.url)
147-
if response.meta['jid'] != self.job_id:
147+
if 'jid' not in response.meta or response.meta['jid'] != self.job_id:
148148
continue
149149
self._backend.page_crawled(response, links)
150150
if type == 'request_error':
151151
_, request, error = msg
152-
if request.meta['jid'] != self.job_id:
152+
if 'jid' not in request.meta or request.meta['jid'] != self.job_id:
153153
continue
154154
logger.debug("Request error %s", request.url)
155155
self._backend.request_error(request, error)

frontera/worker/strategy.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
from time import asctime
33
import logging
4-
from traceback import format_stack
4+
from traceback import format_stack, format_tb
55
from signal import signal, SIGUSR1
66
from logging.config import fileConfig
77
from argparse import ArgumentParser
@@ -170,14 +170,14 @@ def work(self):
170170

171171
if type == 'page_crawled':
172172
_, response, links = msg
173-
if response.meta['jid'] != self.job_id:
173+
if 'jid' not in response.meta or response.meta['jid'] != self.job_id:
174174
continue
175175
self.on_page_crawled(response, links)
176176
continue
177177

178178
if type == 'request_error':
179179
_, request, error = msg
180-
if request.meta['jid'] != self.job_id:
180+
if 'jid' not in request.meta or request.meta['jid'] != self.job_id:
181181
continue
182182
self.on_request_error(request, error)
183183
continue
@@ -203,6 +203,8 @@ def work(self):
203203
def run(self):
204204
def errback(failure):
205205
logger.exception(failure.value)
206+
if failure.frames:
207+
logger.critical(str("").join(format_tb(failure.getTracebackObject())))
206208
self.task.start(interval=0).addErrback(errback)
207209

208210
def debug(sig, frame):

0 commit comments

Comments
 (0)