Skip to content

Commit

Permalink
Streaming commands / Communication (#3)
Browse files Browse the repository at this point in the history
* intial streaming

* initial communication procotol

* commands tests

* separating communication logic from protocol

* streaming args and readme
  • Loading branch information
aron-bordin authored and Raul Gallegos committed Jun 1, 2016
1 parent 303d882 commit bea89a7
Show file tree
Hide file tree
Showing 16 changed files with 456 additions and 16 deletions.
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,42 @@ The Scrapy Streaming provides an interface to write spiders using any programmin

Also, we officially provide helper libraries to develop your spiders using Java, JS, and R.

## Quickstart

You can read a quick tutorial about scrapy-streaming at http://gsoc2016.readthedocs.io/en/latest/quickstart.html

## Usage

You can execute an external spider using the ``streaming`` command, as follows:

scrapy streaming /path/of/executable

and if you need to use extra arguments, add them using the ``-a`` parameter:

scrapy streaming my_executable -a arg1 -a arg2 -a arg3,arg4

If you want to integrate this spider with a scrapy's project, define it in the ``external.json`` file in the root of the project.
For example, to add a spider developed in java, and a compiled one, the ``external.json`` can be defined as:

[
{
"name": "java_spider",
"command": "java",
"args": ["/home/user/MySpider"]
},
{
"name": "compiled_spider",
"command": "/home/user/my_executable"
}
]

and then you can execute them using the ``crawl`` command. Inside the project directory, run:

scrapy crawl spider_name

in this example, ``spider_name`` can be ``java_spider``, ``compiled_spider``, or the name of a Scrapy's spider.

## Documentation

Documentation is available online at http://gsoc2016.readthedocs.io and in the docs directory.
(Temp url, this doc is from the development fork)
11 changes: 10 additions & 1 deletion scrapy_streaming/commands/crawl.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
from scrapy.commands.crawl import Command

from scrapy_streaming.external_spiderloader import ExternalSpiderLoader


class CrawlCommand(Command):
"""
Extends the scrapy crawl command, adding the possibility to start a external spider using the crawl command
"""
pass

def run(self, args, opts):
try:
super(CrawlCommand, self).run(args, opts)
except KeyError:
spname = args[0]

ExternalSpiderLoader.from_settings(self.settings).crawl(spname)
29 changes: 25 additions & 4 deletions scrapy_streaming/commands/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from scrapy.commands import ScrapyCommand
from scrapy.exceptions import UsageError

from scrapy_streaming.external_spiderloader import ExternalSpider, ExternalSpiderLoader


class StreamingCommand(ScrapyCommand):
"""
Expand All @@ -17,11 +19,30 @@ def syntax(self):
def short_desc(self):
return "Run a external spider using Scrapy Streaming given its path (doesn't require a project)"

def add_options(self, parser):
super(StreamingCommand, self).add_options(parser)

parser.add_option('-a', '--args', default=[], action='append', metavar="'ARG1,ARG2,...'",
help='set command arguments')

def run(self, args, opts):
if len(args) != 1:
raise UsageError()
filename = args[0]
if not os.path.exists(filename):
raise UsageError("File not found: %s\n" % filename)
command = args[0]

arguments = _parse_arguments(opts.args)
spider = ExternalSpider('StreamingSpider', command, arguments)
loader = ExternalSpiderLoader.from_settings(self.settings, load_spiders=False)

loader.crawl(spider)


def _parse_arguments(list_of_args):
"""
Receives a list with string arguments and split the string arguments by comma
"""
args = []
for arg in list_of_args:
args += [argument.strip() for argument in arg.split(',')]

raise NotImplementedError()
return args
3 changes: 3 additions & 0 deletions scrapy_streaming/communication/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from scrapy_streaming.communication.map import CommunicationMap
from scrapy_streaming.communication.wrappers import *

65 changes: 65 additions & 0 deletions scrapy_streaming/communication/map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import json

from scrapy.utils.python import to_unicode, to_native_str

from scrapy_streaming.communication import wrappers
from scrapy_streaming.utils import MessageError


class CommunicationMap(object):
"""
Helper class to create the json messages
"""

mapping = {
'spider': wrappers.SpiderMessage,
'request': wrappers.RequestMessage,
'log': wrappers.LogMessage
}

@staticmethod
def parse(line):
try:
msg = json.loads(to_native_str(line))

if not isinstance(msg, dict):
raise MessageError('This message is not a json object.')
if 'type' not in msg:
raise MessageError('"type" field not provided.')

msg_type = msg.pop('type')
try:
return CommunicationMap.mapping[msg_type].from_dict(msg)
except KeyError:
raise MessageError('%s is not a valid message type.' % msg_type)
except ValueError:
raise MessageError('Received message is not a valid json.')

@staticmethod
def ready():
fields = {'type': 'ready', 'status': 'ready'}
return json.dumps(fields)

@staticmethod
def error(message, details):
fields = {'type': 'error',
'received_message': to_unicode(message),
'details': to_unicode(details)}
return json.dumps(fields)

@staticmethod
def response(resp, request_id='parse'):
fields = _extract_fields(resp, ['url', 'headers', 'status', 'body', 'meta', 'flags'])
fields['id'] = to_unicode(request_id)
return json.dumps(fields)


def _extract_fields(item, fields):
"""
Given a list of fields, generate a dict with key being the name of the field
mapping to the serialized item.field value
"""
data = {}
for field in fields:
data[field] = json.loads(json.dumps(getattr(item, field)))
return data
66 changes: 66 additions & 0 deletions scrapy_streaming/communication/wrappers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import six

from scrapy_streaming.spiders import StreamingSpider
from scrapy_streaming.utils import MessageError, RequiredField


class ExternalSpiderMessageWrapper(object):
validator = {}

def __init__(self, default, fields):
self.data = fields
self.validate(fields)
self.update(default, fields)

@classmethod
def from_dict(cls, data):
return cls(data)

def validate(self, data):
validator = self.validator
for key, value in data.items():
if key not in validator:
raise MessageError('Unknown message field: %s' % key)

if value is not None and not isinstance(value, validator[key]):
raise MessageError('%s field must be defined as %s, received: %s' %
(key, validator[key].__name__, type(value).__name__))

def update(self, default, data):
default.update(data)
for item, value in default.items():
if isinstance(value, RequiredField):
raise MessageError('Required field: %s' % item)
setattr(self, item, value)


class RequestMessage(ExternalSpiderMessageWrapper):
validator = {'id': six.text_type, 'url': six.text_type}

def __init__(self, fields):
default = {'id': None, 'start_urls': None, 'method': None, 'meta': None,
'body': None, 'headers': None, 'cookies': None, 'encoding': None,
'priority': None, 'dont_filter': None}

super(RequestMessage, self).__init__(default, fields)


class SpiderMessage(ExternalSpiderMessageWrapper):
validator = {'name': six.text_type, 'start_urls': list,
'allowed_domains': list, 'custom_settings': dict}

def __init__(self, fields):
default = {'name': RequiredField(), 'start_urls': RequiredField(),
'allowed_domains': None, 'custom_settings': None}

super(SpiderMessage, self).__init__(default, fields)


class LogMessage(ExternalSpiderMessageWrapper):

validator = {'message': six.text_type, 'level': six.text_type}

def __init__(self, fields):
default = {'message': RequiredField(), 'level': RequiredField()}

super(LogMessage, self).__init__(default, fields)
31 changes: 22 additions & 9 deletions scrapy_streaming/external_spiderloader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
import os

from twisted.internet import reactor

from scrapy_streaming.process_streaming import ProcessStreamingProtocol
from scrapy_streaming.utils import get_project_root


Expand All @@ -9,7 +12,7 @@ class ExternalSpider(object):
Object to represent external spiders defined in ``external.json``.
"""

def __init__(self, name, command, args=None):
def __init__(self, name, command, args=[]):
if args is not None and not isinstance(args, list):
raise ValueError("'args' must be defined as an array of strings")
self.name = name
Expand All @@ -26,17 +29,19 @@ class ExternalSpiderLoader(object):
This class manages external spiders defined in the ``external.json``
"""

def __init__(self, settings):
path = settings.get('EXTERNAL_SPIDERS_PATH', get_project_root())
# TODO add EXTERNAL_SPIDERS_PATH in docs
path = os.path.abspath(path)
self.external = os.path.join(path, 'external.json')
def __init__(self, settings, load_spiders=True):
self._spiders = {}
self._fetch_spiders()

if load_spiders:
path = settings.get('EXTERNAL_SPIDERS_PATH', get_project_root())
# TODO add EXTERNAL_SPIDERS_PATH in docs
path = os.path.abspath(path)
self.external = os.path.join(path, 'external.json')
self._fetch_spiders()

@classmethod
def from_settings(cls, settings):
return cls(settings)
def from_settings(cls, settings, **kw):
return cls(settings, **kw)

def _fetch_spiders(self):
"""
Expand All @@ -59,6 +64,14 @@ def list(self):
"""
return list(self._spiders.values())

def crawl(self, name_or_spider):
if not isinstance(name_or_spider, ExternalSpider):
name_or_spider = self._spiders[name_or_spider]

protocol = ProcessStreamingProtocol()
reactor.spawnProcess(protocol, name_or_spider.command, args=[name_or_spider.command] + name_or_spider.args)
reactor.run()


def _read_json(path):
"""
Expand Down
44 changes: 44 additions & 0 deletions scrapy_streaming/line_receiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from scrapy.utils.python import to_bytes
from twisted.internet import protocol


class LineProcessProtocol(protocol.ProcessProtocol, object):
"""
This class extends the twisted ProcessProtocol to split the incoming data in lines.
The data received by ``outReceived`` if added to an internal buffer, and dispatched by ``lineReceived``
"""

def __init__(self):
self.__buffer = b''
self.__delimiter = b'\n'

def outReceived(self, data):
"""
Implement the outReceived method, buffering the incoming data and
dispatching line by line in the ``lineReceived`` method.
"""
self.__buffer += data

lines = self.__buffer.splitlines()
if data.endswith(self.__delimiter):
self.__buffer = b''
else:
self.__buffer = lines.pop()

for line in lines:
self.lineReceived(line)

def lineReceived(self, line):
"""
An entire line received by process stdout. You must implement this method to use this class.
"""
raise NotImplementedError

def writeLine(self, data):
"""
Write the data to the process stdin, adding the new-line delimiter if necessary
"""
data = to_bytes(data)
if not data.endswith(b'\n'):
data += self.__delimiter
self.transport.write(data)
Loading

0 comments on commit bea89a7

Please sign in to comment.