Skip to content
This repository was archived by the owner on Jan 29, 2022. It is now read-only.

Commit 22cd303

Browse files
author
Luke Lovett
committed
Ensure that BSONPickler and custom constructors are registered on every Spark node (HADOOP-273).
1 parent aecd367 commit 22cd303

File tree

8 files changed

+131
-86
lines changed

8 files changed

+131
-86
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.mongodb.spark;
2+
3+
import com.mongodb.hadoop.BSONFileInputFormat;
4+
import com.mongodb.spark.pickle.RegisterConstructors;
5+
import com.mongodb.spark.pickle.RegisterPickles;
6+
7+
public class PySparkBSONFileInputFormat extends BSONFileInputFormat {
8+
private static final RegisterPickles PICKLES = new RegisterPickles();
9+
private static final RegisterConstructors CONSTRUCTORS =
10+
new RegisterConstructors();
11+
12+
static {
13+
PICKLES.register();
14+
CONSTRUCTORS.register();
15+
}
16+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.mongodb.spark;
2+
3+
import com.mongodb.hadoop.BSONFileOutputFormat;
4+
import com.mongodb.spark.pickle.RegisterConstructors;
5+
import com.mongodb.spark.pickle.RegisterPickles;
6+
7+
public class PySparkBSONFileOutputFormat<K, V>
8+
extends BSONFileOutputFormat<K, V> {
9+
private static final RegisterPickles PICKLES = new RegisterPickles();
10+
private static final RegisterConstructors CONSTRUCTORS =
11+
new RegisterConstructors();
12+
13+
static {
14+
PICKLES.register();
15+
CONSTRUCTORS.register();
16+
}
17+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.mongodb.spark;
2+
3+
import com.mongodb.hadoop.MongoInputFormat;
4+
import com.mongodb.spark.pickle.RegisterConstructors;
5+
import com.mongodb.spark.pickle.RegisterPickles;
6+
7+
/**
8+
* InputFormat that attaches custom Picklers and IObjectConstructors for
9+
* reading and writing BSON types with PyMongo.
10+
*/
11+
public class PySparkMongoInputFormat extends MongoInputFormat {
12+
private static final RegisterPickles PICKLES = new RegisterPickles();
13+
private static final RegisterConstructors CONSTRUCTORS =
14+
new RegisterConstructors();
15+
16+
static {
17+
PICKLES.register();
18+
CONSTRUCTORS.register();
19+
}
20+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.mongodb.spark;
2+
3+
import com.mongodb.hadoop.MongoOutputFormat;
4+
import com.mongodb.spark.pickle.RegisterConstructors;
5+
import com.mongodb.spark.pickle.RegisterPickles;
6+
7+
public class PySparkMongoOutputFormat<K, V>
8+
extends MongoOutputFormat<K, V> {
9+
private static final RegisterPickles PICKLES = new RegisterPickles();
10+
private static final RegisterConstructors CONSTRUCTORS =
11+
new RegisterConstructors();
12+
13+
static {
14+
PICKLES.register();
15+
CONSTRUCTORS.register();
16+
}
17+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.mongodb.spark.pickle;
2+
3+
import net.razorvine.pickle.Unpickler;
4+
import org.bson.BSON;
5+
6+
public class RegisterConstructors {
7+
public void register() {
8+
Unpickler.registerConstructor("bson.binary", "Binary",
9+
new com.mongodb.spark.pickle.BinaryConstructor());
10+
Unpickler.registerConstructor("bson.code", "Code",
11+
new com.mongodb.spark.pickle.CodeConstructor());
12+
Unpickler.registerConstructor("bson.dbref", "DBRef",
13+
new com.mongodb.spark.pickle.DBRefConstructor());
14+
Unpickler.registerConstructor("bson.int64", "Int64",
15+
new com.mongodb.spark.pickle.Int64Constructor());
16+
Unpickler.registerConstructor("bson.max_key", "MaxKey",
17+
new com.mongodb.spark.pickle.MaxKeyConstructor());
18+
Unpickler.registerConstructor("bson.min_key", "MinKey",
19+
new com.mongodb.spark.pickle.MinKeyConstructor());
20+
Unpickler.registerConstructor("bson.timestamp", "Timestamp",
21+
new com.mongodb.spark.pickle.TimestampConstructor());
22+
Unpickler.registerConstructor("bson.regex", "Regex",
23+
new com.mongodb.spark.pickle.RegexConstructor());
24+
Unpickler.registerConstructor("bson.objectid", "ObjectId",
25+
new com.mongodb.spark.pickle.ObjectIdConstructor());
26+
27+
BSON.addEncodingHook(
28+
java.util.GregorianCalendar.class,
29+
new CalendarTransformer());
30+
}
31+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.mongodb.spark.pickle;
2+
3+
import net.razorvine.pickle.Pickler;
4+
5+
public class RegisterPickles {
6+
private static final BSONPickler PICKLER = new BSONPickler();
7+
8+
public void register() {
9+
Pickler.registerCustomPickler(org.bson.types.ObjectId.class, PICKLER);
10+
Pickler.registerCustomPickler(org.bson.types.Binary.class, PICKLER);
11+
Pickler.registerCustomPickler(org.bson.types.Code.class, PICKLER);
12+
Pickler.registerCustomPickler(org.bson.types.CodeWScope.class, PICKLER);
13+
Pickler.registerCustomPickler(
14+
org.bson.types.CodeWithScope.class, PICKLER);
15+
Pickler.registerCustomPickler(org.bson.types.MaxKey.class, PICKLER);
16+
Pickler.registerCustomPickler(org.bson.types.MinKey.class, PICKLER);
17+
Pickler.registerCustomPickler(
18+
org.bson.types.BSONTimestamp.class, PICKLER);
19+
Pickler.registerCustomPickler(com.mongodb.DBRef.class, PICKLER);
20+
Pickler.registerCustomPickler(java.util.regex.Pattern.class, PICKLER);
21+
Pickler.registerCustomPickler(java.util.Date.class, PICKLER);
22+
}
23+
}

spark/src/main/python/README.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ Installation
2323
cd mongo-hadoop/spark/src/main/python
2424
python setup.py install
2525

26+
3. Install `pymongo <https://pypi.python.org/pypi/pymongo>`_ on each machine in
27+
your Spark cluster.
28+
2629
You'll also need to put ``mongo-hadoop-spark.jar`` (see above for instructions
2730
on how to obtain this) somewhere on Spark's ``CLASSPATH`` prior to using this
2831
package.

spark/src/main/python/pymongo_spark.py

Lines changed: 4 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -14,89 +14,9 @@
1414

1515
__version__ = '0.1'
1616

17-
import sys
18-
19-
import py4j
2017
import pyspark
2118

2219

23-
# These types need special pickling to work correctly with PyMongo.
24-
_PICKLE_BSON_TYPES = (
25-
'org.bson.types.ObjectId',
26-
'org.bson.types.Binary',
27-
'org.bson.types.Code',
28-
'org.bson.types.CodeWScope',
29-
'org.bson.types.CodeWithScope',
30-
'org.bson.types.MaxKey',
31-
'org.bson.types.MinKey',
32-
'org.bson.types.BSONTimestamp',
33-
'com.mongodb.DBRef',
34-
'java.util.regex.Pattern',
35-
'java.util.Date'
36-
)
37-
38-
39-
# Register Constructors for unpickling.
40-
# (module, class)
41-
_UNPICKLE_CONSTRUCTORS = (
42-
('bson.binary', 'Binary'),
43-
('bson.code', 'Code'),
44-
('bson.dbref', 'DBRef'),
45-
('bson.int64', 'Int64'),
46-
('bson.max_key', 'MaxKey'),
47-
('bson.min_key', 'MinKey'),
48-
('bson.timestamp', 'Timestamp'),
49-
('bson.regex', 'Regex'),
50-
('bson.objectid', 'ObjectId')
51-
)
52-
53-
54-
def _ensure_pickles(self):
55-
if not getattr(self, '__registered_picklers', False):
56-
try:
57-
jvm = self._jvm
58-
pickler = jvm.net.razorvine.pickle.Pickler
59-
bson_pickler = jvm.com.mongodb.spark.pickle.BSONPickler()
60-
61-
for pbt in _PICKLE_BSON_TYPES:
62-
pickler.registerCustomPickler(
63-
jvm.java.lang.Class.forName(pbt), bson_pickler)
64-
65-
unpickler = jvm.net.razorvine.pickle.Unpickler
66-
for unpc in _UNPICKLE_CONSTRUCTORS:
67-
unpickler.registerConstructor(
68-
unpc[0], unpc[1],
69-
jvm.java.lang.Class.forName(
70-
'com.mongodb.spark.pickle.%sConstructor' % unpc[1])
71-
.newInstance())
72-
73-
# Register CalendarTransformer with the Java driver so that we can
74-
# encode java.util.GregorianCalendar objects. GregorianCalendar is
75-
# what is constructed out of pickled datetime objects.
76-
# We can't create a custom IObjectConstructor like we do for other
77-
# BSON types, because the Razorvine library already has a
78-
# constructor for datetimes.
79-
jvm.org.bson.BSON.addEncodingHook(
80-
# SyntaxError to access ".class" attribute.
81-
jvm.java.lang.Class.forName('java.util.GregorianCalendar'),
82-
jvm.java.lang.Class.forName(
83-
'com.mongodb.spark.pickle.CalendarTransformer')
84-
.newInstance())
85-
self.__registered_picklers = True
86-
except py4j.protocol.Py4JError:
87-
orig_t, orig_v, orig_tb = sys.exc_info()
88-
try:
89-
# Try to guess most common cause of failure.
90-
raise (py4j.protocol.Py4JError,
91-
"Error while communicating with the JVM. "
92-
"Is the MongoDB Spark jar on Spark's CLASSPATH? : " +
93-
str(orig_v),
94-
orig_tb)
95-
finally:
96-
# Avoid circular reference with traceback.
97-
del orig_tb
98-
99-
10020
def saveToMongoDB(self, connection_string, config=None):
10121
"""Save this RDD to MongoDB."""
10222
conf = {'mongo.output.uri': connection_string}
@@ -112,7 +32,7 @@ def saveToMongoDB(self, connection_string, config=None):
11232
keyClass = 'org.apache.hadoop.io.NullWritable'
11333
to_save.saveAsNewAPIHadoopFile(
11434
'file:///this-is-unused',
115-
outputFormatClass='com.mongodb.hadoop.MongoOutputFormat',
35+
outputFormatClass='com.mongodb.spark.PySparkMongoOutputFormat',
11636
keyClass=keyClass,
11737
valueClass='com.mongodb.hadoop.io.BSONWritable',
11838
keyConverter='com.mongodb.spark.pickle.NoopConverter',
@@ -132,7 +52,7 @@ def saveToBSON(self, file_path, config=None):
13252
keyClass = 'org.apache.hadoop.io.NullWritable'
13353
to_save.saveAsNewAPIHadoopFile(
13454
file_path,
135-
outputFormatClass='com.mongodb.hadoop.BSONFileOutputFormat',
55+
outputFormatClass='com.mongodb.spark.PySparkBSONFileOutputFormat',
13656
keyClass=keyClass,
13757
valueClass='com.mongodb.hadoop.io.BSONWritable',
13858
keyConverter='com.mongodb.spark.pickle.NoopConverter',
@@ -143,23 +63,21 @@ def saveToBSON(self, file_path, config=None):
14363

14464
def BSONFilePairRDD(self, file_path, config=None):
14565
"""Create a pair RDD backed by a BSON file."""
146-
_ensure_pickles(self)
14766
return self.newAPIHadoopFile(
14867
file_path,
149-
inputFormatClass='com.mongodb.hadoop.BSONFileInputFormat',
68+
inputFormatClass='com.mongodb.spark.PySparkBSONFileInputFormat',
15069
keyClass='com.mongodb.hadoop.io.BSONWritable',
15170
valueClass='com.mongodb.hadoop.io.BSONWritable',
15271
conf=config)
15372

15473

15574
def mongoPairRDD(self, connection_string, config=None):
15675
"""Create a pair RDD backed by MongoDB."""
157-
_ensure_pickles(self)
15876
conf = {'mongo.input.uri': connection_string}
15977
if config:
16078
conf.update(config)
16179
return self.newAPIHadoopRDD(
162-
inputFormatClass='com.mongodb.hadoop.MongoInputFormat',
80+
inputFormatClass='com.mongodb.spark.PySparkMongoInputFormat',
16381
keyClass='com.mongodb.hadoop.io.BSONWritable',
16482
valueClass='com.mongodb.hadoop.io.BSONWritable',
16583
conf=conf)

0 commit comments

Comments
 (0)