Skip to content

Commit 915ad36

Browse files
committed
Initial commit
0 parents  commit 915ad36

File tree

7 files changed

+394
-0
lines changed

7 files changed

+394
-0
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
.python-version
2+
drpyspark.egg-info
3+
*.pyc

README.md

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# drpyspark
2+
3+
The doctor is in.
4+
5+
drpyspark provides handy utilities for debugging and tuning pyspark programs.
6+
A work in progress.
7+
8+
9+
## Better debugging
10+
11+
Just add
12+
13+
```python
14+
import drpyspark
15+
drpyspark.enable_debug_output()
16+
```
17+
18+
To your Spark script and then you'll get wonderful output that shows you
19+
exactly what is happening at every stage in your pyspark script without having
20+
to add 500 print statements in between things.
21+
22+
23+
```python
24+
def is_even(val):
25+
if val % 2 == 0:
26+
return [val]
27+
return []
28+
29+
30+
def square(val):
31+
return val ** 2
32+
33+
34+
with pyspark.SparkContext() as sc:
35+
numbers = sc.parallelize([str(x) for x in xrange(10)])
36+
even_squares = (numbers
37+
.map(lambda l: int(l))
38+
.map(square)
39+
.flatMap(is_even))
40+
div_100 = even_squares.map(lambda l: l / 100.0)
41+
print(div_100.collect())
42+
43+
```
44+
45+
When run with `enable_debug_output`, provides:
46+
```
47+
/Users/mikesukmanowsky/code/drpyspark/examples/rdd_1.py:18: numbers = sc.parallelize([str(x) for x in xrange(10)])
48+
['0', '1', '2', '3', '4']
49+
/Users/mikesukmanowsky/code/drpyspark/examples/rdd_1.py:20: .map(lambda l: int(l))
50+
[0, 1, 2, 3, 4]
51+
/Users/mikesukmanowsky/code/drpyspark/examples/rdd_1.py:21: .map(square)
52+
[0, 1, 4, 9, 16]
53+
/Users/mikesukmanowsky/code/drpyspark/examples/rdd_1.py:22: .flatMap(is_even))
54+
[0, 4, 16, 36, 64]
55+
/Users/mikesukmanowsky/code/drpyspark/examples/rdd_1.py:23: div_100 = even_squares.map(lambda l: l / 100.0)
56+
[0.0, 0.04, 0.16, 0.36, 0.64]
57+
```
58+
59+
## Running examples
60+
61+
You'll need to [download a release of Apache Spark](http://spark.apache.org/)
62+
With a virtualenv built, install `drpyspark` with `pip install drpyspark` (or
63+
`python setup.py develop`) then run:
64+
65+
```
66+
PYSPARK_PYTHON=$(which python) $SPARK_HOME/bin/spark-submit examples/<filename>
67+
```

drpyspark/__init__.py

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from __future__ import print_function
2+
import functools
3+
import inspect
4+
import logging
5+
import re
6+
import pprint
7+
import sys
8+
9+
try:
10+
import pyspark
11+
except ImportError:
12+
pyspark = None
13+
14+
from .version import VERSION
15+
log = logging.getLogger(__package__)
16+
17+
18+
def print_output(f):
19+
@functools.wraps(f)
20+
def _debug_pyspark_call(*args, **kwargs):
21+
log.debug('%s called', f.__name__)
22+
stack = inspect.stack()
23+
caller = stack[1]
24+
caller_package = inspect.getmodule(caller[0]).__package__
25+
26+
result = f(*args, **kwargs)
27+
if caller_package in ('pyspark', __package__):
28+
log.debug('%s internal call to %s, returning to avoid infinite '
29+
'recursion', caller_package, f.__name__)
30+
return result
31+
if not isinstance(result, (pyspark.RDD, pyspark.sql.DataFrame)):
32+
log.debug('%s returned non RDD/DataFrame value, returning',
33+
f.__name__)
34+
return result
35+
36+
log.debug('printing 5 from %s', f.__name__)
37+
sample = result.take(5)
38+
file, line_no, code = stack[-1][1], stack[-1][2], ''.join(stack[-1][4]).strip()
39+
print('{}:{}: {}'.format(file, line_no, code))
40+
pprint.pprint(sample)
41+
return result
42+
43+
return _debug_pyspark_call
44+
45+
46+
def enable_debug_output(num_elements=5):
47+
if pyspark is None:
48+
print('pyspark not found in PYTHONPATH, did you run via spark-submit?',
49+
file=sys.stderr)
50+
sys.exit(1)
51+
52+
classes_to_patch = (pyspark.SparkContext, pyspark.sql.SQLContext,
53+
pyspark.sql.HiveContext, pyspark.RDD,
54+
pyspark.sql.DataFrame)
55+
for klass in classes_to_patch:
56+
members = inspect.getmembers(klass)
57+
# get all public methods not starting with _ or save
58+
methods = [(name, member) for (name, member) in members
59+
if not name.startswith('_') and inspect.ismethod(member)]
60+
for name, method in methods:
61+
setattr(klass, name, print_output(method))
62+
log.debug('Patched %s.%s.', klass.__name__, name)

drpyspark/version.py

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
def _safe_int(string):
2+
"""Simple function to convert strings into ints without dying.
3+
Helps when we define versions like 0.1.0dev"""
4+
try:
5+
return int(string)
6+
except ValueError:
7+
return string
8+
9+
10+
__version__ = '0.1.0'
11+
VERSION = tuple(_safe_int(x) for x in __version__.split('.'))

examples/rdd_1.py

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from __future__ import print_function
2+
import pyspark
3+
import drpyspark
4+
drpyspark.enable_debug_output()
5+
6+
7+
def is_even(val):
8+
if val % 2 == 0:
9+
return [val]
10+
return []
11+
12+
13+
def square(val):
14+
return val ** 2
15+
16+
17+
with pyspark.SparkContext() as sc:
18+
numbers = sc.parallelize([str(x) for x in xrange(10)])
19+
even_squares = (numbers
20+
.map(lambda l: int(l))
21+
.map(square)
22+
.flatMap(is_even))
23+
div_100 = even_squares.map(lambda l: l / 100.0)
24+
print(div_100.collect())
25+

0 commit comments

Comments
 (0)