9
9
from os import environ
10
10
from sys import maxint
11
11
12
- def ListTweets (bucket ):
12
+ def ListTweets (bucket , streaming = False ):
13
13
"""Dump all keys in the bucket to stdout"""
14
14
decoder = bucket .get_decoder ("application/json" )
15
- allkeys = bucket .get_keys ()
15
+ if streaming :
16
+ allkeys = bucket .stream_keys ()
17
+ allkeys = [item for sublist in allkeys for item in sublist ]
18
+ else :
19
+ allkeys = bucket .get_keys ()
16
20
bucket .allow_mult = True
17
21
count = 0
18
22
if allkeys == None :
19
23
print "No Tweets."
20
24
return
25
+ print "Total keys = {0}" .format (len (allkeys ))
21
26
for key in allkeys :
22
27
count = count + 1
23
28
obj = bucket .get (key )
24
29
for sibling in obj .siblings :
25
30
json = sibling .encoded_data
26
- dict = decoder (json )
27
- print "[%d] - %s - %s at %s" % (count ,key ,dict ['user' ],dict ['time' ])
28
- print dict ['tweet' ].encode ("utf-8" )
31
+ jsdict = decoder (json )
32
+ print "[%d] - %s - %s at %s" % (count ,key ,jsdict ['user' ],jsdict ['time' ])
33
+ print jsdict ['tweet' ].encode ("utf-8" )
29
34
links = sibling .links
30
35
for parent_link in links :
31
36
print "LINK"
@@ -34,9 +39,9 @@ def ListTweets(bucket):
34
39
for psibling in pobj .siblings :
35
40
pjson = psibling .encoded_data
36
41
if pjson != None :
37
- dict = decoder (pjson )
38
- print "\t PARENT - %s at %s" % (dict ['user' ],dict ['time' ])
39
- print "\t %s" % dict ['tweet' ].encode ("utf-8" )
42
+ jsdict = decoder (pjson )
43
+ print "\t PARENT - %s at %s" % (jsdict ['user' ],jsdict ['time' ])
44
+ print "\t %s" % jsdict ['tweet' ].encode ("utf-8" )
40
45
41
46
42
47
def DeleteTweets (bucket ):
@@ -83,7 +88,8 @@ def LoadTweets(protocol, bucket, quantity, term):
83
88
tweet = bucket .new (str (status .id ), data = {
84
89
'tweet' : status .text .encode ("utf-8" ),
85
90
'user' : status .user .screen_name ,
86
- 'time' : dt .isoformat ()
91
+ 'time' : dt .isoformat () + 'Z'
92
+ # Add 'Z' for Solr Compatibility
87
93
})
88
94
tweet .add_index ('user_bin' ,status .user .screen_name )
89
95
if parent != None :
@@ -93,7 +99,7 @@ def LoadTweets(protocol, bucket, quantity, term):
93
99
tweet .add_link (parent_node )
94
100
tweet .store ()
95
101
96
- def SearchTweets (client , bucket , term ):
102
+ def SearchOldTweets (client , bucket , term ):
97
103
# First parameter is the bucket we want to search within, the second
98
104
# is the query we want to perform.
99
105
print 'tweet:{0}' .format (term )
@@ -107,6 +113,26 @@ def SearchTweets(client, bucket, term):
107
113
print "%d = %s - %s at %s" % (count , item ['id' ], item ['user' ], item ['time' ])
108
114
print item ['tweet' ]
109
115
116
+
117
+ def SearchTweets (client , bucket , term ):
118
+ # First parameter is the bucket we want to search within, the second
119
+ # is the query we want to perform against Riak 2.0 aka Yokozuna
120
+
121
+ # Format <key>:*<value>*
122
+ results = bucket .search (term )
123
+ count = 0
124
+ decoder = bucket .get_decoder ("application/json" )
125
+ for item in results ['docs' ]:
126
+ count = count + 1
127
+ key = item ['_yz_rk' ]
128
+ obj = bucket .get (key )
129
+ for sibling in obj .siblings :
130
+ json = sibling .encoded_data
131
+ jsdict = decoder (json )
132
+ print "[%d] - %s - %s at %s" % (count ,key ,jsdict ['user' ],jsdict ['time' ])
133
+ print jsdict ['tweet' ].encode ("utf-8" )
134
+
135
+
110
136
def Search2iTweets (bucket , term ):
111
137
decoder = bucket .get_decoder ("application/json" )
112
138
result = bucket .get_index ('user_bin' , term )
@@ -116,9 +142,9 @@ def Search2iTweets(bucket, term):
116
142
count = count + 1
117
143
obj = bucket .get (key )
118
144
json = obj .get_encoded_data ()
119
- dict = decoder (json )
120
- print "[%d] - %s at %s" % (count ,dict ['user' ],dict ['time' ])
121
- print dict ['tweet' ]
145
+ jsdict = decoder (json )
146
+ print "[%d] - %s at %s" % (count ,jsdict ['user' ],jsdict ['time' ])
147
+ print jsdict ['tweet' ]
122
148
123
149
def MapReduceTweets (client , bucket , term ):
124
150
query = client .add (bucket .name )
@@ -129,6 +155,19 @@ def MapReduceTweets(client, bucket, term):
129
155
# Print the key (``v.key``) and the value for that key (``data``).
130
156
print "%s - %s" % (result [0 ], result [1 ])
131
157
158
+ def CreateSearchSchema (client , name ):
159
+ xml_file = open (name + '.xml' , 'r' )
160
+ schema_data = xml_file .read ()
161
+ client .create_search_schema (name , schema_data )
162
+ xml_file .close ()
163
+ client .create_search_index (name , name )
164
+ time .sleep (5 )
165
+
166
+ # Associate bucket with search index
167
+ bucket = client .bucket ('twitter' )
168
+ bucket .set_property ('search_index' , 'twitter' )
169
+
170
+
132
171
# MAIN
133
172
parser = argparse .ArgumentParser (description = 'Brew us some fresh, hot Riak!' )
134
173
parser .add_argument ('-p' ,'--dump' , help = 'Dump all tweets' , action = 'store_true' )
@@ -138,10 +177,11 @@ def MapReduceTweets(client, bucket, term):
138
177
parser .add_argument ('--host' , help = 'Hostname' , default = 'localhost' )
139
178
parser .add_argument ('-t' ,'--http' , type = int , help = 'HTTP port number' , default = 10018 )
140
179
parser .add_argument ('-b' ,'--pbc' , type = int , help = 'Protocol Buffer port number' , default = 10017 )
141
- parser .add_argument ('--protocol' , help = 'Name of transport protocol to use' , default = 'http ' , choices = ['http' , 'https ' ,'pbc' ])
180
+ parser .add_argument ('--protocol' , help = 'Name of transport protocol to use' , default = 'pbc ' , choices = ['http' ,'pbc' ])
142
181
parser .add_argument ('-x' ,'--delete' , help = 'Delete all tweets' , action = 'store_true' )
143
182
parser .add_argument ('-2' ,'--twoi' , help = 'Query 2i' )
144
183
parser .add_argument ('-mr' ,'--mapreduce' , help = 'Test MapReduce to look for a user' 's tweets' )
184
+ parser .add_argument ('-sch' ,'--schema' , help = 'Create a YZ search schema from XML file' )
145
185
args = parser .parse_args ()
146
186
print args
147
187
@@ -161,7 +201,7 @@ def MapReduceTweets(client, bucket, term):
161
201
DeleteTweets (bucket )
162
202
elif args .dump :
163
203
print "Dumping all existing tweets in Riak"
164
- ListTweets (bucket )
204
+ ListTweets (bucket , False )
165
205
elif args .twoi != None :
166
206
Search2iTweets (bucket , args .twoi )
167
207
elif args .mapreduce != None :
@@ -172,3 +212,6 @@ def MapReduceTweets(client, bucket, term):
172
212
elif args .search != None :
173
213
print "Searching for term '%s' in loaded tweets" % args .search
174
214
SearchTweets (client , bucket , args .search )
215
+ elif args .schema != None :
216
+ print "Creating schema '%s'" % args .schema
217
+ CreateSearchSchema (client , args .schema )
0 commit comments